Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 df6d5cb9a -> f241f82c3


SQOOP-2464: Initializer object is not reused when calling getSchema

(Abraham Fine via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/f241f82c
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/f241f82c
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/f241f82c

Branch: refs/heads/sqoop2
Commit: f241f82c3824dc6e0ef23968712569f81c1d10ee
Parents: df6d5cb
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Wed Sep 16 09:46:22 2015 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Wed Sep 16 09:46:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/sqoop/job/etl/Initializer.java   |  3 ++
 .../org/apache/sqoop/driver/JobManager.java     | 34 +++++++++-----------
 docs/src/site/sphinx/ConnectorDevelopment.rst   |  2 +-
 3 files changed, 19 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/f241f82c/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java
----------------------------------------------------------------------
diff --git 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java
 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java
index fddd162..83ab2e8 100644
--- 
a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java
+++ 
b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java
@@ -28,6 +28,9 @@ import org.apache.sqoop.schema.Schema;
 /**
  * This allows connector to define initialization work for execution,
  * for example, context configuration.
+ *
+ * All method invocations on an instance of Initializer can be assumed
+ * to come from the same process.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f241f82c/core/src/main/java/org/apache/sqoop/driver/JobManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java 
b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
index 87c6c74..dc90a0e 100644
--- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java
+++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java
@@ -375,17 +375,23 @@ public class JobManager implements Reconfigurable {
     addConnectorIDFClass(jobRequest, 
fromConnector.getIntermediateDataFormat());
     addConnectorIDFClass(jobRequest, toConnector.getIntermediateDataFormat());
 
-    addConnectorInitializerJars(jobRequest, Direction.FROM);
-    addConnectorInitializerJars(jobRequest, Direction.TO);
+    Initializer fromInitializer = getConnectorInitializer(jobRequest, 
Direction.FROM);
+    Initializer toInitializer = getConnectorInitializer(jobRequest, 
Direction.TO);
+
+    InitializerContext fromInitializerContext = 
getConnectorInitializerContext(jobRequest, Direction.FROM);
+    InitializerContext toInitializerContext = 
getConnectorInitializerContext(jobRequest, Direction.TO);
+
+    addConnectorInitializerJars(jobRequest, Direction.FROM, fromInitializer, 
fromInitializerContext);
+    addConnectorInitializerJars(jobRequest, Direction.TO, toInitializer, 
toInitializerContext);
     addIDFDependentJars(jobRequest, Direction.FROM);
     addIDFDependentJars(jobRequest, Direction.TO);
 
     // call the intialize method
-    initializeConnector(jobRequest, Direction.FROM);
-    initializeConnector(jobRequest, Direction.TO);
+    initializeConnector(jobRequest, Direction.FROM, fromInitializer, 
fromInitializerContext);
+    initializeConnector(jobRequest, Direction.TO, toInitializer, 
toInitializerContext);
 
-    
jobRequest.getJobSubmission().setFromSchema(getSchemaForConnector(jobRequest, 
Direction.FROM));
-    
jobRequest.getJobSubmission().setToSchema(getSchemaForConnector(jobRequest, 
Direction.TO));
+    
jobRequest.getJobSubmission().setFromSchema(getSchemaForConnector(jobRequest, 
Direction.FROM, fromInitializer, fromInitializerContext));
+    
jobRequest.getJobSubmission().setToSchema(getSchemaForConnector(jobRequest, 
Direction.TO, toInitializer, toInitializerContext));
 
     LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + 
jobRequest.getTo());
     return jobRequest;
@@ -453,21 +459,14 @@ public class JobManager implements Reconfigurable {
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  private void initializeConnector(JobRequest jobRequest, Direction direction) 
{
-    Initializer initializer = getConnectorInitializer(jobRequest, direction);
-    InitializerContext initializerContext = 
getConnectorInitializerContext(jobRequest, direction);
-
+  private void initializeConnector(JobRequest jobRequest, Direction direction, 
Initializer initializer, InitializerContext initializerContext) {
     // Initialize submission from the connector perspective
     initializer.initialize(initializerContext, 
jobRequest.getConnectorLinkConfig(direction),
         jobRequest.getJobConfig(direction));
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  private Schema getSchemaForConnector(JobRequest jobRequest, Direction 
direction) {
-
-    Initializer initializer = getConnectorInitializer(jobRequest, direction);
-    InitializerContext initializerContext = 
getConnectorInitializerContext(jobRequest, direction);
-
+  private Schema getSchemaForConnector(JobRequest jobRequest, Direction 
direction, Initializer initializer, InitializerContext initializerContext) {
     return initializer.getSchema(initializerContext, 
jobRequest.getConnectorLinkConfig(direction),
         jobRequest.getJobConfig(direction));
   }
@@ -480,10 +479,7 @@ public class JobManager implements Reconfigurable {
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  private void addConnectorInitializerJars(JobRequest jobRequest, Direction 
direction) {
-
-    Initializer initializer = getConnectorInitializer(jobRequest, direction);
-    InitializerContext initializerContext = 
getConnectorInitializerContext(jobRequest, direction);
+  private void addConnectorInitializerJars(JobRequest jobRequest, Direction 
direction, Initializer initializer, InitializerContext initializerContext) {
     // Add job specific jars to
     jobRequest.addJars(initializer.getJars(initializerContext,
         jobRequest.getConnectorLinkConfig(direction), 
jobRequest.getJobConfig(direction)));

http://git-wip-us.apache.org/repos/asf/sqoop/blob/f241f82c/docs/src/site/sphinx/ConnectorDevelopment.rst
----------------------------------------------------------------------
diff --git a/docs/src/site/sphinx/ConnectorDevelopment.rst 
b/docs/src/site/sphinx/ConnectorDevelopment.rst
index 1ea1881..b35c521 100644
--- a/docs/src/site/sphinx/ConnectorDevelopment.rst
+++ b/docs/src/site/sphinx/ConnectorDevelopment.rst
@@ -121,7 +121,7 @@ Initializer and Destroyer
 .. _Initializer:
 .. _Destroyer:
 
-Initializer is instantiated before the submission of sqoop job to the 
execution engine and doing preparations such as connecting to the data source, 
creating temporary tables or adding dependent jar files. Initializers are 
executed as the first step in the sqoop job lifecyle. Here is the 
``Initializer`` API.
+Initializer is instantiated before the submission of sqoop job to the 
execution engine and doing preparations such as connecting to the data source, 
creating temporary tables or adding dependent jar files. Initializers are 
executed as the first step in the sqoop job lifecyle. All interactions within 
an initializer are assumed to occur within a single thread, so state can be 
maintained between method calls (such as database connections). Here is the 
``Initializer`` API.
 ::
 
   public abstract void initialize(InitializerContext context, 
LinkConfiguration linkConfiguration,

Reply via email to