Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 9fc123830 -> 9f05b4a5c


SQOOP-1803: Sqoop2: Update and save configuration upon job success

(Jarek Jarcec Cecho via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/9f05b4a5
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/9f05b4a5
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/9f05b4a5

Branch: refs/heads/sqoop2
Commit: 9f05b4a5cf3ee92e5922107988ac13c6f2098f25
Parents: 9fc1238
Author: Abraham Elmahrek <[email protected]>
Authored: Mon Apr 20 17:32:12 2015 -0700
Committer: Abraham Elmahrek <[email protected]>
Committed: Mon Apr 20 17:32:12 2015 -0700

----------------------------------------------------------------------
 .../jdbc/GenericJdbcFromDestroyer.java          |  4 ++
 .../org/apache/sqoop/job/etl/Destroyer.java     | 19 ++++++
 .../org/apache/sqoop/driver/JobManager.java     | 68 +++++++++++++++++++-
 .../jdbc/generic/IncrementalReadTest.java       | 11 +++-
 4 files changed, 97 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
index 3a783be..93388c6 100644
--- 
a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
+++ 
b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromDestroyer.java
@@ -33,4 +33,8 @@ public class GenericJdbcFromDestroyer extends 
Destroyer<LinkConfiguration, FromJ
     LOG.info("Running generic JDBC connector destroyer");
   }
 
+  @Override
+  public void updateConfiguration(DestroyerContext context, LinkConfiguration 
linkConfiguration, FromJobConfiguration fromJobConfiguration) {
+    fromJobConfiguration.incrementalRead.lastValue = 
context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_LAST_INCREMENTAL_VALUE);
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
index b4ab6d7..68b5e13 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Destroyer.java
@@ -41,4 +41,23 @@ public abstract class Destroyer<LinkConfiguration, 
JobConfiguration> {
                                LinkConfiguration linkConfiguration,
                                JobConfiguration jobConfiguration);
 
+  /** Callback to update configuration objects given values in context.
+   *
+   * This callback will be called only after successful execution of the job. 
It will
+   * might be executed on different machine then other callbacks and hence it 
should not
+   * use any state stored within the instance.
+   *
+   * This method is designed to update configuration objects for next job run, 
so that
+   * user can move data in incremental fashion. Sqoop framework will update 
the configuration
+   * objects in repository after calling this method.
+   *
+   * @param context Destroyer context
+   * @param linkConfiguration Link configuration object
+   * @param jobConfiguration Job configuration object
+   */
+  public void updateConfiguration(DestroyerContext context,
+                                  LinkConfiguration linkConfiguration,
+                                  JobConfiguration jobConfiguration) {
+    // Default implementation does nothing
+  }
 }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java 
b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 4044510..e8ca17c 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -38,7 +38,9 @@ import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.job.etl.Transferable;
 import org.apache.sqoop.model.ConfigUtils;
-import org.apache.sqoop.model.SubmissionError;
+import org.apache.sqoop.model.MConfig;
+import org.apache.sqoop.model.MConfigList;
+import org.apache.sqoop.model.MInput;
 import org.apache.sqoop.model.MJob;
 import org.apache.sqoop.model.MLink;
 import org.apache.sqoop.model.MSubmission;
@@ -47,7 +49,6 @@ import org.apache.sqoop.repository.RepositoryManager;
 import org.apache.sqoop.request.HttpEventContext;
 import org.apache.sqoop.schema.Schema;
 import org.apache.sqoop.submission.SubmissionStatus;
-import org.apache.sqoop.submission.counter.Counters;
 import org.apache.sqoop.utils.ClassUtils;
 
 public class JobManager implements Reconfigurable {
@@ -511,6 +512,61 @@ public class JobManager implements Reconfigurable {
     executionEngine.prepareJob(request);
   }
 
+  void invokeDestroyerOnJobSuccess(MSubmission submission) {
+    try {
+      MJob job = getJob(submission.getJobId());
+
+      SqoopConnector fromConnector = 
getSqoopConnector(job.getFromConnectorId());
+      SqoopConnector toConnector = getSqoopConnector(job.getToConnectorId());
+
+      MLink fromConnection = getLink(job.getFromLinkId());
+      MLink toConnection = getLink(job.getToLinkId());
+
+      Object fromLinkConfig = 
ClassUtils.instantiate(fromConnector.getLinkConfigurationClass());
+      
ConfigUtils.fromConfigs(fromConnection.getConnectorLinkConfig().getConfigs(), 
fromLinkConfig);
+
+      Object toLinkConfig = 
ClassUtils.instantiate(toConnector.getLinkConfigurationClass());
+      
ConfigUtils.fromConfigs(toConnection.getConnectorLinkConfig().getConfigs(), 
toLinkConfig);
+
+      Object fromJob = 
ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
+      ConfigUtils.fromConfigs(job.getFromJobConfig().getConfigs(), fromJob);
+
+      Object toJob = 
ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
+      ConfigUtils.fromConfigs(job.getToJobConfig().getConfigs(), toJob);
+
+      Destroyer fromDestroyer = (Destroyer) 
ClassUtils.instantiate(fromConnector.getFrom().getDestroyer());
+      Destroyer toDestroyer = (Destroyer) 
ClassUtils.instantiate(toConnector.getTo().getDestroyer());
+
+      DestroyerContext fromDestroyerContext = new 
DestroyerContext(submission.getFromConnectorContext(), true, 
submission.getFromSchema());
+      DestroyerContext toDestroyerContext = new 
DestroyerContext(submission.getToConnectorContext(), false, 
submission.getToSchema());
+
+      fromDestroyer.updateConfiguration(fromDestroyerContext, fromLinkConfig, 
fromJob);
+      toDestroyer.updateConfiguration(toDestroyerContext, toLinkConfig, toJob);
+
+      List<MConfig> fromJobUpdated = ConfigUtils.toConfigs(fromJob);
+      List<MConfig> toJobUpdated = ConfigUtils.toConfigs(toJob);
+
+      for (MConfig config : fromJobUpdated) {
+        MConfigList originalInput = job.getFromJobConfig();
+        for (MInput input : config.getInputs()) {
+          originalInput.getInput(input.getName()).setValue(input.getValue());
+        }
+      }
+      for (MConfig config : toJobUpdated) {
+        MConfigList originalInput = job.getToJobConfig();
+        for (MInput input : config.getInputs()) {
+          Object value = input.getValue();
+          originalInput.getInput(input.getName()).setValue(value);
+        }
+      }
+
+      RepositoryManager.getInstance().getRepository().updateJob(job);
+    } catch(Exception ex) {
+      LOG.error("Exception when invoking destroyer on job success", ex);
+      submission.setStatus(SubmissionStatus.FAILED);
+    }
+  }
+
   /**
    * Callback that will be called only if we failed to submit the job to the
    * remote cluster.
@@ -590,7 +646,15 @@ public class JobManager implements Reconfigurable {
    * @param submission Submission to update
    */
   public void updateSubmission(MSubmission submission) {
+    // We're expecting that this method will be called only if we think that 
the submission is still running
+    assert submission.getStatus().isRunning();
+
     submissionEngine.update(submission);
+
+    if (!submission.getStatus().isRunning() && 
!submission.getStatus().isFailure()) {
+      invokeDestroyerOnJobSuccess(submission);
+    }
+
     
RepositoryManager.getInstance().getRepository().updateSubmission(submission);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/9f05b4a5/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
----------------------------------------------------------------------
diff --git 
a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
 
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
index 5bde35c..0355a36 100644
--- 
a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
+++ 
b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/IncrementalReadTest.java
@@ -18,7 +18,6 @@
 package org.apache.sqoop.integration.connector.jdbc.generic;
 
 import com.google.common.collect.Iterables;
-import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.model.MConfigList;
 import org.apache.sqoop.model.MJob;
@@ -33,6 +32,8 @@ import org.testng.annotations.Test;
 
 import java.lang.reflect.Method;
 
+import static org.testng.Assert.assertEquals;
+
 /**
  */
 public class IncrementalReadTest extends ConnectorTestCase implements ITest {
@@ -104,7 +105,9 @@ public class IncrementalReadTest extends ConnectorTestCase 
implements ITest {
         "19,'Saucy Salamander',13.10,'2013-10-17',false"
       );
 
-    // TODO: After Sqoop will be properly updating configuration objects we 
need to verify new max value
+    // Verify new last value
+    MJob updatedJob = getClient().getJob(job.getPersistenceId());
+    
assertEquals(updatedJob.getFromJobConfig().getStringInput("incrementalRead.lastValue").getValue(),
 newMaxValue);
 
     // Clean up testing table
     dropTable();
@@ -157,7 +160,9 @@ public class IncrementalReadTest extends ConnectorTestCase 
implements ITest {
         "19,'Saucy Salamander',13.10,'2013-10-17',false"
       );
 
-    // TODO: After Sqoop will be properly updating configuration objects we 
need to verify new max value
+    // Verify new last value
+    MJob updatedJob = getClient().getJob(job.getPersistenceId());
+    
assertEquals(updatedJob.getFromJobConfig().getStringInput("incrementalRead.lastValue").getValue(),
 newMaxValue);
 
     // Clean up testing table
     dropTable();

Reply via email to