Repository: sqoop Updated Branches: refs/heads/sqoop2 9fc123830 -> 9f05b4a5c
SQOOP-1803: Sqoop2: Update and save configuration upon job success (Jarek Jarcec Cecho via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/9f05b4a5 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/9f05b4a5 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/9f05b4a5 Branch: refs/heads/sqoop2 Commit: 9f05b4a5cf3ee92e5922107988ac13c6f2098f25 Parents: 9fc1238 Author: Abraham Elmahrek <[email protected]> Authored: Mon Apr 20 17:32:12 2015 -0700 Committer: Abraham Elmahrek <[email protected]> Committed: Mon Apr 20 17:32:12 2015 -0700 ---------------------------------------------------------------------- .../jdbc/GenericJdbcFromDestroyer.java | 4 ++ .../org/apache/sqoop/job/etl/Destroyer.java | 19 ++++++ .../org/apache/sqoop/driver/JobManager.java | 68 +++++++++++++++++++- .../jdbc/generic/IncrementalReadTest.java | 11 +++- 4 files changed, 97 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java index 3a783be..93388c6 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java @@ -33,4 +33,8 @@ public class GenericJdbcFromDestroyer extends Destroyer<LinkConfiguration, FromJ LOG.info("Running generic JDBC connector destroyer"); } + @Override + public void updateConfiguration(DestroyerContext context, LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration) { + fromJobConfiguration.incrementalRead.lastValue = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index b4ab6d7..68b5e13 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -41,4 +41,23 @@ public abstract class Destroyer<LinkConfiguration, JobConfiguration> { LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration); + /** Callback to update configuration objects given values in context. + * + * This callback will be called only after successful execution of the job. It will + * might be executed on different machine then other callbacks and hence it should not + * use any state stored within the instance. + * + * This method is designed to update configuration objects for next job run, so that + * user can move data in incremental fashion. Sqoop framework will update the configuration + * objects in repository after calling this method. + * + * @param context Destroyer context + * @param linkConfiguration Link configuration object + * @param jobConfiguration Job configuration object + */ + public void updateConfiguration(DestroyerContext context, + LinkConfiguration linkConfiguration, + JobConfiguration jobConfiguration) { + // Default implementation does nothing + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 4044510..e8ca17c 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -38,7 +38,9 @@ import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.job.etl.InitializerContext; import org.apache.sqoop.job.etl.Transferable; import org.apache.sqoop.model.ConfigUtils; -import org.apache.sqoop.model.SubmissionError; +import org.apache.sqoop.model.MConfig; +import org.apache.sqoop.model.MConfigList; +import org.apache.sqoop.model.MInput; import org.apache.sqoop.model.MJob; import org.apache.sqoop.model.MLink; import org.apache.sqoop.model.MSubmission; @@ -47,7 +49,6 @@ import org.apache.sqoop.repository.RepositoryManager; import org.apache.sqoop.request.HttpEventContext; import org.apache.sqoop.schema.Schema; import org.apache.sqoop.submission.SubmissionStatus; -import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.utils.ClassUtils; public class JobManager implements Reconfigurable { @@ -511,6 +512,61 @@ public class JobManager implements Reconfigurable { executionEngine.prepareJob(request); } + void invokeDestroyerOnJobSuccess(MSubmission submission) { + try { + MJob job = getJob(submission.getJobId()); + + SqoopConnector fromConnector = getSqoopConnector(job.getFromConnectorId()); + SqoopConnector toConnector = getSqoopConnector(job.getToConnectorId()); + + MLink fromConnection = getLink(job.getFromLinkId()); + MLink toConnection = getLink(job.getToLinkId()); + + Object fromLinkConfig = ClassUtils.instantiate(fromConnector.getLinkConfigurationClass()); + ConfigUtils.fromConfigs(fromConnection.getConnectorLinkConfig().getConfigs(), fromLinkConfig); + + Object toLinkConfig = ClassUtils.instantiate(toConnector.getLinkConfigurationClass()); + ConfigUtils.fromConfigs(toConnection.getConnectorLinkConfig().getConfigs(), toLinkConfig); + + Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM)); + ConfigUtils.fromConfigs(job.getFromJobConfig().getConfigs(), fromJob); + + Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO)); + ConfigUtils.fromConfigs(job.getToJobConfig().getConfigs(), toJob); + + Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromConnector.getFrom().getDestroyer()); + Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toConnector.getTo().getDestroyer()); + + DestroyerContext fromDestroyerContext = new DestroyerContext(submission.getFromConnectorContext(), true, submission.getFromSchema()); + DestroyerContext toDestroyerContext = new DestroyerContext(submission.getToConnectorContext(), false, submission.getToSchema()); + + fromDestroyer.updateConfiguration(fromDestroyerContext, fromLinkConfig, fromJob); + toDestroyer.updateConfiguration(toDestroyerContext, toLinkConfig, toJob); + + List<MConfig> fromJobUpdated = ConfigUtils.toConfigs(fromJob); + List<MConfig> toJobUpdated = ConfigUtils.toConfigs(toJob); + + for (MConfig config : fromJobUpdated) { + MConfigList originalInput = job.getFromJobConfig(); + for (MInput input : config.getInputs()) { + originalInput.getInput(input.getName()).setValue(input.getValue()); + } + } + for (MConfig config : toJobUpdated) { + MConfigList originalInput = job.getToJobConfig(); + for (MInput input : config.getInputs()) { + Object value = input.getValue(); + originalInput.getInput(input.getName()).setValue(value); + } + } + + RepositoryManager.getInstance().getRepository().updateJob(job); + } catch(Exception ex) { + LOG.error("Exception when invoking destroyer on job success", ex); + submission.setStatus(SubmissionStatus.FAILED); + } + } + /** * Callback that will be called only if we failed to submit the job to the * remote cluster. @@ -590,7 +646,15 @@ public class JobManager implements Reconfigurable { * @param submission Submission to update */ public void updateSubmission(MSubmission submission) { + // We're expecting that this method will be called only if we think that the submission is still running + assert submission.getStatus().isRunning(); + submissionEngine.update(submission); + + if (!submission.getStatus().isRunning() && !submission.getStatus().isFailure()) { + invokeDestroyerOnJobSuccess(submission); + } + RepositoryManager.getInstance().getRepository().updateSubmission(submission); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java index 5bde35c..0355a36 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java @@ -18,7 +18,6 @@ package org.apache.sqoop.integration.connector.jdbc.generic; import com.google.common.collect.Iterables; -import org.apache.sqoop.common.Direction; import org.apache.sqoop.connector.hdfs.configuration.ToFormat; import org.apache.sqoop.model.MConfigList; import org.apache.sqoop.model.MJob; @@ -33,6 +32,8 @@ import org.testng.annotations.Test; import java.lang.reflect.Method; +import static org.testng.Assert.assertEquals; + /** */ public class IncrementalReadTest extends ConnectorTestCase implements ITest { @@ -104,7 +105,9 @@ public class IncrementalReadTest extends ConnectorTestCase implements ITest { "19,'Saucy Salamander',13.10,'2013-10-17',false" ); - // TODO: After Sqoop will be properly updating configuration objects we need to verify new max value + // Verify new last value + MJob updatedJob = getClient().getJob(job.getPersistenceId()); + assertEquals(updatedJob.getFromJobConfig().getStringInput("incrementalRead.lastValue").getValue(), newMaxValue); // Clean up testing table dropTable(); @@ -157,7 +160,9 @@ public class IncrementalReadTest extends ConnectorTestCase implements ITest { "19,'Saucy Salamander',13.10,'2013-10-17',false" ); - // TODO: After Sqoop will be properly updating configuration objects we need to verify new max value + // Verify new last value + MJob updatedJob = getClient().getJob(job.getPersistenceId()); + assertEquals(updatedJob.getFromJobConfig().getStringInput("incrementalRead.lastValue").getValue(), newMaxValue); // Clean up testing table dropTable();
