Repository: sqoop Updated Branches: refs/heads/sqoop2 df6d5cb9a -> f241f82c3
SQOOP-2464: Initializer object is not reused when calling getSchema (Abraham Fine via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/f241f82c Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/f241f82c Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/f241f82c Branch: refs/heads/sqoop2 Commit: f241f82c3824dc6e0ef23968712569f81c1d10ee Parents: df6d5cb Author: Jarek Jarcec Cecho <[email protected]> Authored: Wed Sep 16 09:46:22 2015 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Wed Sep 16 09:46:22 2015 -0700 ---------------------------------------------------------------------- .../org/apache/sqoop/job/etl/Initializer.java | 3 ++ .../org/apache/sqoop/driver/JobManager.java | 34 +++++++++----------- docs/src/site/sphinx/ConnectorDevelopment.rst | 2 +- 3 files changed, 19 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/f241f82c/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java index fddd162..83ab2e8 100644 --- a/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/connector/connector-sdk/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -28,6 +28,9 @@ import org.apache.sqoop.schema.Schema; /** * This allows connector to define initialization work for execution, * for example, context configuration. + * + * All method invocations on an instance of Initializer can be assumed + * to come from the same process. */ @InterfaceAudience.Public @InterfaceStability.Evolving http://git-wip-us.apache.org/repos/asf/sqoop/blob/f241f82c/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index 87c6c74..dc90a0e 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -375,17 +375,23 @@ public class JobManager implements Reconfigurable { addConnectorIDFClass(jobRequest, fromConnector.getIntermediateDataFormat()); addConnectorIDFClass(jobRequest, toConnector.getIntermediateDataFormat()); - addConnectorInitializerJars(jobRequest, Direction.FROM); - addConnectorInitializerJars(jobRequest, Direction.TO); + Initializer fromInitializer = getConnectorInitializer(jobRequest, Direction.FROM); + Initializer toInitializer = getConnectorInitializer(jobRequest, Direction.TO); + + InitializerContext fromInitializerContext = getConnectorInitializerContext(jobRequest, Direction.FROM); + InitializerContext toInitializerContext = getConnectorInitializerContext(jobRequest, Direction.TO); + + addConnectorInitializerJars(jobRequest, Direction.FROM, fromInitializer, fromInitializerContext); + addConnectorInitializerJars(jobRequest, Direction.TO, toInitializer, toInitializerContext); addIDFDependentJars(jobRequest, Direction.FROM); addIDFDependentJars(jobRequest, Direction.TO); // call the intialize method - initializeConnector(jobRequest, Direction.FROM); - initializeConnector(jobRequest, Direction.TO); + initializeConnector(jobRequest, Direction.FROM, fromInitializer, fromInitializerContext); + initializeConnector(jobRequest, Direction.TO, toInitializer, toInitializerContext); - jobRequest.getJobSubmission().setFromSchema(getSchemaForConnector(jobRequest, Direction.FROM)); - jobRequest.getJobSubmission().setToSchema(getSchemaForConnector(jobRequest, Direction.TO)); + jobRequest.getJobSubmission().setFromSchema(getSchemaForConnector(jobRequest, Direction.FROM, fromInitializer, fromInitializerContext)); + jobRequest.getJobSubmission().setToSchema(getSchemaForConnector(jobRequest, Direction.TO, toInitializer, toInitializerContext)); LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo()); return jobRequest; @@ -453,21 +459,14 @@ public class JobManager implements Reconfigurable { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private void initializeConnector(JobRequest jobRequest, Direction direction) { - Initializer initializer = getConnectorInitializer(jobRequest, direction); - InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); - + private void initializeConnector(JobRequest jobRequest, Direction direction, Initializer initializer, InitializerContext initializerContext) { // Initialize submission from the connector perspective initializer.initialize(initializerContext, jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)); } @SuppressWarnings({ "unchecked", "rawtypes" }) - private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction) { - - Initializer initializer = getConnectorInitializer(jobRequest, direction); - InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); - + private Schema getSchemaForConnector(JobRequest jobRequest, Direction direction, Initializer initializer, InitializerContext initializerContext) { return initializer.getSchema(initializerContext, jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction)); } @@ -480,10 +479,7 @@ public class JobManager implements Reconfigurable { } @SuppressWarnings({ "unchecked", "rawtypes" }) - private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) { - - Initializer initializer = getConnectorInitializer(jobRequest, direction); - InitializerContext initializerContext = getConnectorInitializerContext(jobRequest, direction); + private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction, Initializer initializer, InitializerContext initializerContext) { // Add job specific jars to jobRequest.addJars(initializer.getJars(initializerContext, jobRequest.getConnectorLinkConfig(direction), jobRequest.getJobConfig(direction))); http://git-wip-us.apache.org/repos/asf/sqoop/blob/f241f82c/docs/src/site/sphinx/ConnectorDevelopment.rst ---------------------------------------------------------------------- diff --git a/docs/src/site/sphinx/ConnectorDevelopment.rst b/docs/src/site/sphinx/ConnectorDevelopment.rst index 1ea1881..b35c521 100644 --- a/docs/src/site/sphinx/ConnectorDevelopment.rst +++ b/docs/src/site/sphinx/ConnectorDevelopment.rst @@ -121,7 +121,7 @@ Initializer and Destroyer .. _Initializer: .. _Destroyer: -Initializer is instantiated before the submission of sqoop job to the execution engine and doing preparations such as connecting to the data source, creating temporary tables or adding dependent jar files. Initializers are executed as the first step in the sqoop job lifecyle. Here is the ``Initializer`` API. +Initializer is instantiated before the submission of sqoop job to the execution engine and doing preparations such as connecting to the data source, creating temporary tables or adding dependent jar files. Initializers are executed as the first step in the sqoop job lifecyle. All interactions within an initializer are assumed to occur within a single thread, so state can be maintained between method calls (such as database connections). Here is the ``Initializer`` API. :: public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,
