Updated Branches: refs/heads/sqoop2 dd3bfa398 -> 66dd617da
SQOOP-998: Sqoop2: Upgrade: Add framework upgrader (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/66dd617d Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/66dd617d Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/66dd617d Branch: refs/heads/sqoop2 Commit: 66dd617da2d866df02f430923d82d76091e10aa7 Parents: dd3bfa3 Author: Jarek Jarcec Cecho <[email protected]> Authored: Fri May 3 12:26:58 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Fri May 3 12:27:33 2013 -0700 ---------------------------------------------------------------------- .../apache/sqoop/framework/FrameworkManager.java | 13 ++ .../sqoop/framework/FrameworkMetadataUpgrader.java | 63 ++++++++++ .../apache/sqoop/repository/JdbcRepository.java | 17 ++- .../sqoop/repository/JdbcRepositoryHandler.java | 17 +++ .../org/apache/sqoop/repository/Repository.java | 93 +++++++++++++- .../repository/derby/DerbyRepositoryHandler.java | 57 +++++++++ .../sqoop/repository/derby/DerbySchemaQuery.java | 18 +++ 7 files changed, 267 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 1a6d427..145a2c1 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -21,6 +21,7 @@ import org.apache.log4j.Logger; import org.apache.sqoop.common.MapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; +import org.apache.sqoop.connector.spi.MetadataUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; import org.apache.sqoop.core.SqoopConfiguration; import org.apache.sqoop.framework.configuration.ConnectionConfiguration; @@ -134,6 +135,11 @@ public class FrameworkManager { private final Validator validator; /** + * Upgrader instance + */ + private final MetadataUpgrader upgrader; + + /** * Configured submission engine instance */ private SubmissionEngine submissionEngine; @@ -218,6 +224,9 @@ public class FrameworkManager { // Build validator validator = new FrameworkValidator(); + + // Build upgrader + upgrader = new FrameworkMetadataUpgrader(); } public synchronized void initialize() { @@ -315,6 +324,10 @@ public class FrameworkManager { return validator; } + public MetadataUpgrader getMetadataUpgrader() { + return upgrader; + } + public Class getConnectionConfigurationClass() { return ConnectionConfiguration.class; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java new file mode 100644 index 0000000..ef00780 --- /dev/null +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkMetadataUpgrader.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sqoop.framework; + +import org.apache.sqoop.connector.spi.MetadataUpgrader; +import org.apache.sqoop.model.MConnectionForms; +import org.apache.sqoop.model.MForm; +import org.apache.sqoop.model.MInput; +import org.apache.sqoop.model.MJobForms; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class FrameworkMetadataUpgrader extends MetadataUpgrader{ + @Override + public void upgrade(MConnectionForms original, + MConnectionForms upgradeTarget) { + doUpgrade(original.getForms(), upgradeTarget.getForms()); + } + + @Override + public void upgrade(MJobForms original, MJobForms upgradeTarget) { + doUpgrade(original.getForms(), upgradeTarget.getForms()); + + } + + @SuppressWarnings("unchecked") + private void doUpgrade(List<MForm> original, List<MForm> target) { + // Easier to find the form in the original forms list if we use a map. + // Since the constructor of MJobForms takes a list, + // index is not guaranteed to be the same, so we need to look for + // equivalence + Map<String, MForm> formMap = new HashMap<String, MForm>(); + for (MForm form : original) { + formMap.put(form.getName(), form); + } + for (MForm form : target) { + List<MInput<?>> inputs = form.getInputs(); + MForm originalForm = formMap.get(form.getName()); + for (MInput input : inputs) { + MInput originalInput = originalForm.getInput(input.getName()); + input.setValue(originalInput.getValue()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java index b2259ce..bc6af37 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepository.java @@ -183,10 +183,7 @@ public class JdbcRepository extends Repository { handler.registerFramework(mFramework, conn); return mFramework; } else { - if (!result.equals(mFramework)) { - throw new SqoopException(RepositoryError.JDBCREPO_0014, - "Framework: given: " + mFramework + " found:" + result); - } + upgradeFramework(mFramework); return result; } } @@ -541,4 +538,16 @@ public class JdbcRepository extends Repository { } }, (JdbcRepositoryTransaction) tx); } + + + protected void updateFramework(final MFramework mFramework, + RepositoryTransaction tx) { + doWithConnection(new DoWithConnection() { + @Override + public Object doIt(Connection conn) throws Exception { + handler.updateFramework(mFramework, conn); + return null; + } + }, (JdbcRepositoryTransaction) tx); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java index 1f88b6d..3d29ab5 100644 --- a/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java +++ b/core/src/main/java/org/apache/sqoop/repository/JdbcRepositoryHandler.java @@ -97,6 +97,23 @@ public abstract class JdbcRepositoryHandler { public abstract void updateConnector(MConnector mConnector, Connection conn); + + /** + * Update the framework with the new data supplied in the + * <tt>mFramework</tt>. + * Also Update all forms in the repository + * with the forms specified in <tt>mFramework</tt>. <tt>mFramework </tt> must + * minimally have the connectorID and all required forms (including ones + * which may not have changed). After this operation the repository is + * guaranteed to only have the new forms specified in this object. + * + * @param mFramework The new data to be inserted into the repository for + * the framework. + * @param conn JDBC connection for querying repository + */ + public abstract void updateFramework(MFramework mFramework, Connection conn); + + /** * Search for framework metadata in the repository. * http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/core/src/main/java/org/apache/sqoop/repository/Repository.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/repository/Repository.java b/core/src/main/java/org/apache/sqoop/repository/Repository.java index 7a7e884..3e34ccb 100644 --- a/core/src/main/java/org/apache/sqoop/repository/Repository.java +++ b/core/src/main/java/org/apache/sqoop/repository/Repository.java @@ -17,11 +17,11 @@ */ package org.apache.sqoop.repository; -import org.apache.sqoop.common.ErrorCode; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.connector.ConnectorManager; import org.apache.sqoop.connector.spi.MetadataUpgrader; import org.apache.sqoop.connector.spi.SqoopConnector; +import org.apache.sqoop.framework.FrameworkManager; import org.apache.sqoop.model.MConnection; import org.apache.sqoop.model.MConnectionForms; import org.apache.sqoop.model.MConnector; @@ -258,6 +258,27 @@ public abstract class Repository { protected abstract void updateConnector(MConnector newConnector, RepositoryTransaction tx); + + /** + * Update the framework with the new data supplied in the + * <tt>mFramework</tt>. Also Update all forms associated with the framework + * in the repository with the forms specified in + * <tt>mFramework</tt>. <tt>mFramework </tt> must + * minimally have the connectorID and all required forms (including ones + * which may not have changed). After this operation the repository is + * guaranteed to only have the new forms specified in this object. + * + * @param mFramework The new data to be inserted into the repository for + * the framework. + * @param tx The repository transaction to use to push the data to the + * repository. If this is null, a new transaction will be created. + * method will not call begin, commit, + * rollback or close on this transaction. + */ + protected abstract void updateFramework(MFramework mFramework, + RepositoryTransaction tx); + + /** * Delete all inputs for a job * @param jobId The id of the job whose inputs are to be deleted. @@ -279,6 +300,16 @@ public abstract class Repository { protected abstract void deleteConnectionInputs(long connectionID, RepositoryTransaction tx); + private void deleteConnectionsAndJobs(List<MConnection> connections, + List<MJob> jobs, RepositoryTransaction tx) { + for (MJob job : jobs) { + deleteJobInputs(job.getPersistenceId(), tx); + } + for (MConnection connection : connections) { + deleteConnectionInputs(connection.getPersistenceId(), tx); + } + } + /** * Upgrade the connector with the same {@linkplain MConnector#uniqueName} * in the repository with values from <code>newConnector</code>. @@ -318,12 +349,7 @@ public abstract class Repository { // -- BEGIN TXN -- tx = getTransaction(); tx.begin(); - for (MJob job : jobs) { - deleteJobInputs(job.getPersistenceId(), tx); - } - for (MConnection connection : connections) { - deleteConnectionInputs(connection.getPersistenceId(), tx); - } + deleteConnectionsAndJobs(connections, jobs, tx); updateConnector(newConnector, tx); for (MConnection connection : connections) { long connectionID = connection.getPersistenceId(); @@ -365,6 +391,59 @@ public abstract class Repository { } } + public final void upgradeFramework(MFramework framework) { + RepositoryTransaction tx = null; + try { + MetadataUpgrader upgrader = FrameworkManager.getInstance() + .getMetadataUpgrader(); + List<MConnection> connections = findConnections(); + List<MJob> jobs = findJobs(); + + // -- BEGIN TXN -- + tx = getTransaction(); + tx.begin(); + deleteConnectionsAndJobs(connections, jobs, tx); + updateFramework(framework, tx); + for (MConnection connection : connections) { + long connectionID = connection.getPersistenceId(); + // Make a new copy of the forms from the connector, + // else the values will get set in the forms in the connector for + // each connection. + List<MForm> forms = cloneForms(framework.getConnectionForms() + .getForms()); + MConnectionForms newConnectionForms = new MConnectionForms(forms); + upgrader.upgrade(connection.getFrameworkPart(), newConnectionForms); + MConnection newConnection = new MConnection(connection.getConnectorId(), + connection.getConnectorPart(), newConnectionForms); + newConnection.setPersistenceId(connectionID); + updateConnection(newConnection, tx); + } + for (MJob job : jobs) { + // Make a new copy of the forms from the framework, + // else the values will get set in the forms in the connector for + // each connection. + List<MForm> forms = cloneForms(framework.getJobForms(job.getType()) + .getForms()); + MJobForms newJobForms = new MJobForms(job.getType(), forms); + upgrader.upgrade(job.getFrameworkPart(), newJobForms); + MJob newJob = new MJob(job.getConnectorId(), job.getConnectionId(), + job.getType(), job.getConnectorPart(), newJobForms); + newJob.setPersistenceId(job.getPersistenceId()); + updateJob(newJob, tx); + } + tx.commit(); + } catch (Exception ex) { + if(tx != null) { + tx.rollback(); + } + throw new SqoopException(RepositoryError.JDBCREPO_0000, ex); + } finally { + if(tx != null) { + tx.close(); + } + } + } + /** * Clones the forms, but does not set the actual data, * validation message etc in the inputs, but only the persistence id of the http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java index 556241e..e4f6562 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbyRepositoryHandler.java @@ -93,6 +93,40 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { } /** + * Helper method to insert the forms from the into the + * repository. The job and connector forms within <code>mc</code> will get + * updated with the id of the forms when this function returns. + * @param mf The MFramework instance to use to upgrade. + * @param conn JDBC connection to use for updating the forms + */ + private void insertFormsForFramework(MFramework mf, Connection conn) { + PreparedStatement baseFormStmt = null; + PreparedStatement baseInputStmt = null; + try{ + baseFormStmt = conn.prepareStatement(STMT_INSERT_FORM_BASE, + Statement.RETURN_GENERATED_KEYS); + + baseInputStmt = conn.prepareStatement(STMT_INSERT_INPUT_BASE, + Statement.RETURN_GENERATED_KEYS); + + // Register connector forms + registerForms(null, null, mf.getConnectionForms().getForms(), + MFormType.CONNECTION.name(), baseFormStmt, baseInputStmt); + + // Register all jobs + for (MJobForms jobForms : mf.getAllJobsForms().values()) { + registerForms(null, jobForms.getType(), jobForms.getForms(), + MFormType.JOB.name(), baseFormStmt, baseInputStmt); + } + + } catch (SQLException ex) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mf.toString(), ex); + } finally { + closeStatements(baseFormStmt, baseInputStmt); + } + } + + /** * Helper method to insert the forms from the MConnector into the * repository. The job and connector forms within <code>mc</code> will get * updated with the id of the forms when this function returns. @@ -705,6 +739,29 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler { * {@inheritDoc} */ @Override + public void updateFramework(MFramework mFramework, Connection conn) { + PreparedStatement deleteForm = null; + PreparedStatement deleteInput = null; + try { + deleteInput = conn.prepareStatement(STMT_DELETE_FRAMEWORK_INPUTS); + deleteForm = conn.prepareStatement(STMT_DELETE_FRAMEWORK_FORMS); + + deleteInput.executeUpdate(); + deleteForm.executeUpdate(); + + } catch (SQLException e) { + throw new SqoopException(DerbyRepoError.DERBYREPO_0038, e); + } finally { + closeStatements(deleteForm, deleteInput); + } + insertFormsForFramework(mFramework, conn); + + } + + /** + * {@inheritDoc} + */ + @Override public void createJob(MJob job, Connection conn) { PreparedStatement stmt = null; int result; http://git-wip-us.apache.org/repos/asf/sqoop/blob/66dd617d/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java ---------------------------------------------------------------------- diff --git a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java index 2e5abb8..24f86ee 100644 --- a/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java +++ b/repository/repository-derby/src/main/java/org/apache/sqoop/repository/derby/DerbySchemaQuery.java @@ -463,6 +463,24 @@ public final class DerbySchemaQuery { + " WHERE " + COLUMN_SQF_CONNECTOR + " = ?)"; + // Delete all framework inputs + public static final String STMT_DELETE_FRAMEWORK_INPUTS = + "DELETE FROM " + TABLE_SQ_INPUT + + " WHERE " + + COLUMN_SQI_FORM + + " IN (SELECT " + + COLUMN_SQF_ID + + " FROM " + TABLE_SQ_FORM + + " WHERE " + + COLUMN_SQF_CONNECTOR + " IS NULL)"; + + // Delete all framework forms + public static final String STMT_DELETE_FRAMEWORK_FORMS = + "DELETE FROM " + TABLE_SQ_FORM + + " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL"; + + + // Update the connector public static final String STMT_UPDATE_CONNECTOR = "UPDATE " + TABLE_SQ_CONNECTOR
