Updated Branches: refs/heads/sqoop2 4c2a343d9 -> 53688b54b
SQOOP-682: Use templating in job.etl classes (Jarek Jarcec Cecho via Cheolsoo Park) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/53688b54 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/53688b54 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/53688b54 Branch: refs/heads/sqoop2 Commit: 53688b54b53ced0b7e56774969ad14f0e80f23d8 Parents: 4c2a343 Author: Cheolsoo Park <[email protected]> Authored: Fri Jan 4 02:58:23 2013 -0800 Committer: Cheolsoo Park <[email protected]> Committed: Fri Jan 4 02:58:23 2013 -0800 ---------------------------------------------------------------------- .../connector/jdbc/GenericJdbcExportDestroyer.java | 6 +++- .../jdbc/GenericJdbcExportInitializer.java | 15 ++++-------- .../connector/jdbc/GenericJdbcExportLoader.java | 6 +++- .../connector/jdbc/GenericJdbcImportDestroyer.java | 6 +++- .../connector/jdbc/GenericJdbcImportExtractor.java | 6 +++- .../jdbc/GenericJdbcImportInitializer.java | 18 +++++---------- .../jdbc/GenericJdbcImportPartitioner.java | 6 +++- .../java/org/apache/sqoop/job/etl/Destroyer.java | 6 ++-- .../java/org/apache/sqoop/job/etl/Extractor.java | 6 ++-- .../java/org/apache/sqoop/job/etl/Initializer.java | 10 ++++---- .../main/java/org/apache/sqoop/job/etl/Loader.java | 6 ++-- .../java/org/apache/sqoop/job/etl/Partitioner.java | 6 ++-- 12 files changed, 48 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java index 37149de..2d53bdd 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportDestroyer.java @@ -19,14 +19,16 @@ package org.apache.sqoop.connector.jdbc; import org.apache.log4j.Logger; import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.job.etl.Destroyer; -public class GenericJdbcExportDestroyer extends Destroyer { +public class GenericJdbcExportDestroyer extends Destroyer<ConnectionConfiguration, ExportJobConfiguration> { private static final Logger LOG = Logger.getLogger(GenericJdbcExportDestroyer.class); @Override - public void destroy(boolean success, ImmutableContext context, Object connectionConfig, Object jobConfig) { + public void destroy(boolean success, ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { LOG.info("Running generic JDBC connector destroyer"); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java index b56ca10..2b0ed00 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportInitializer.java @@ -30,29 +30,24 @@ import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.utils.ClassUtils; -public class GenericJdbcExportInitializer extends Initializer { +public class GenericJdbcExportInitializer extends Initializer<ConnectionConfiguration, ExportJobConfiguration> { private GenericJdbcExecutor executor; @Override - public void initialize(MutableContext context, Object connectionConfiguration, Object jobConfiguration) { - ConnectionConfiguration connectionConfig = (ConnectionConfiguration)connectionConfiguration; - ExportJobConfiguration jobConfig = (ExportJobConfiguration)jobConfiguration; - - configureJdbcProperties(context, connectionConfig, jobConfig); + public void initialize(MutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { + configureJdbcProperties(context, connection, job); try { - configureTableProperties(context, connectionConfig, jobConfig); - + configureTableProperties(context, connection, job); } finally { executor.close(); } } @Override - public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) { + public List<String> getJars(ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job) { List<String> jars = new LinkedList<String>(); - ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration; jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver)); return jars; http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java index b2e59f7..ec047b4 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcExportLoader.java @@ -18,10 +18,12 @@ package org.apache.sqoop.connector.jdbc; import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ExportJobConfiguration; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.io.DataReader; -public class GenericJdbcExportLoader extends Loader { +public class GenericJdbcExportLoader extends Loader<ConnectionConfiguration, ExportJobConfiguration> { public static final int DEFAULT_ROWS_PER_BATCH = 100; public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100; @@ -29,7 +31,7 @@ public class GenericJdbcExportLoader extends Loader { private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; @Override - public void load(ImmutableContext context, Object oc, Object oj, DataReader reader) throws Exception{ + public void load(ImmutableContext context, ConnectionConfiguration connection, ExportJobConfiguration job, DataReader reader) throws Exception{ String driver = context.getString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER); String url = context.getString( http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java index e09b0c3..f7043ea 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportDestroyer.java @@ -18,12 +18,14 @@ package org.apache.sqoop.connector.jdbc; import org.apache.sqoop.common.ImmutableContext; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.etl.Destroyer; -public class GenericJdbcImportDestroyer extends Destroyer { +public class GenericJdbcImportDestroyer extends Destroyer<ConnectionConfiguration, ImportJobConfiguration> { @Override - public void destroy(boolean success, ImmutableContext context, Object connectionConfig, Object jobConfig) { + public void destroy(boolean success, ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) { // No explicit action at the moment } http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java index df78755..c3ecdda 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java @@ -24,17 +24,19 @@ import java.sql.SQLException; import org.apache.log4j.Logger; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.io.DataWriter; -public class GenericJdbcImportExtractor extends Extractor { +public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration> { public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class); private long rowsRead = 0; @Override - public void run(ImmutableContext context, Object connectionC, Object jobC, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, Partition partition, DataWriter writer) { String driver = context.getString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_DRIVER); String url = context.getString( http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java index 3b697b6..781c7f2 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportInitializer.java @@ -34,7 +34,7 @@ import org.apache.sqoop.job.Constants; import org.apache.sqoop.job.etl.Initializer; import org.apache.sqoop.utils.ClassUtils; -public class GenericJdbcImportInitializer extends Initializer { +public class GenericJdbcImportInitializer extends Initializer<ConnectionConfiguration, ImportJobConfiguration> { private static final Logger LOG = Logger.getLogger(GenericJdbcImportInitializer.class); @@ -42,26 +42,20 @@ public class GenericJdbcImportInitializer extends Initializer { private GenericJdbcExecutor executor; @Override - public void initialize(MutableContext context, Object oConnectionConfig, Object oJobConfig) { - ConnectionConfiguration connectionConfig = (ConnectionConfiguration)oConnectionConfig; - ImportJobConfiguration jobConfig = (ImportJobConfiguration)oJobConfig; - - configureJdbcProperties(context, connectionConfig, jobConfig); - + public void initialize(MutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) { + configureJdbcProperties(context, connection, job); try { - configurePartitionProperties(context, connectionConfig, jobConfig); - configureTableProperties(context, connectionConfig, jobConfig); - + configurePartitionProperties(context, connection, job); + configureTableProperties(context, connection, job); } finally { executor.close(); } } @Override - public List<String> getJars(ImmutableContext context, Object connectionConfiguration, Object jobConfiguration) { + public List<String> getJars(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job) { List<String> jars = new LinkedList<String>(); - ConnectionConfiguration connection = (ConnectionConfiguration) connectionConfiguration; jars.add(ClassUtils.jarForClass(connection.connection.jdbcDriver)); return jars; http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java index 0d9f0c0..2313e65 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportPartitioner.java @@ -23,10 +23,12 @@ import java.util.List; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.connector.jdbc.configuration.ConnectionConfiguration; +import org.apache.sqoop.connector.jdbc.configuration.ImportJobConfiguration; import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Partitioner; -public class GenericJdbcImportPartitioner extends Partitioner { +public class GenericJdbcImportPartitioner extends Partitioner<ConnectionConfiguration, ImportJobConfiguration> { private long numberPartitions; private String partitionColumnName; @@ -35,7 +37,7 @@ public class GenericJdbcImportPartitioner extends Partitioner { private String partitionMaxValue; @Override - public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, Object connectionC, Object jobC) { + public List<Partition> getPartitions(ImmutableContext context, long maxPartitions, ConnectionConfiguration connection, ImportJobConfiguration job) { numberPartitions = maxPartitions; partitionColumnName = context.getString( GenericJdbcConnectorConstants.CONNECTOR_JDBC_PARTITION_COLUMNNAME); http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index 528d550..cf2ed9a 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -23,7 +23,7 @@ import org.apache.sqoop.common.ImmutableContext; * This allows connector to define work to complete execution, for example, * resource cleaning. */ -public abstract class Destroyer { +public abstract class Destroyer<ConnectionConfiguration, JobConfiguration> { /** * Callback to clean up after job execution. @@ -35,7 +35,7 @@ public abstract class Destroyer { */ public abstract void destroy(boolean success, ImmutableContext context, - Object connectionConfiguration, - Object jobConfiguration); + ConnectionConfiguration connectionConfiguration, + JobConfiguration jobConfiguration); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java index e824b98..fac6f05 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java @@ -24,11 +24,11 @@ import org.apache.sqoop.job.io.DataWriter; * This allows connector to extract data from a source system * based on each partition. */ -public abstract class Extractor { +public abstract class Extractor<ConnectionConfiguration, JobConfiguration> { public abstract void run(ImmutableContext context, - Object connectionConfiguration, - Object jobConfiguration, + ConnectionConfiguration connectionConfiguration, + JobConfiguration jobConfiguration, Partition partition, DataWriter writer); http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java index 685378f..3fb6be0 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Initializer.java @@ -27,7 +27,7 @@ import java.util.List; * This allows connector to define initialization work for execution, * for example, context configuration. */ -public abstract class Initializer { +public abstract class Initializer<ConnectionConfiguration, JobConfiguration> { /** * Initialize new submission based on given configuration properties. Any @@ -39,8 +39,8 @@ public abstract class Initializer { * @param jobConfiguration Connector's job configuration object */ public abstract void initialize(MutableContext context, - Object connectionConfiguration, - Object jobConfiguration); + ConnectionConfiguration connectionConfiguration, + JobConfiguration jobConfiguration); /** * Return list of all jars that this particular connector needs to operate @@ -50,8 +50,8 @@ public abstract class Initializer { * @return */ public List<String> getJars(ImmutableContext context, - Object connectionConfiguration, - Object jobConfiguration) { + ConnectionConfiguration connectionConfiguration, + JobConfiguration jobConfiguration) { return new LinkedList<String>(); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java index 3148e49..024be94 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Loader.java @@ -23,7 +23,7 @@ import org.apache.sqoop.job.io.DataReader; /** * This allows connector to load data into a target system. */ -public abstract class Loader { +public abstract class Loader<ConnectionConfiguration, JobConfiguration> { /** * Load data to target. @@ -35,8 +35,8 @@ public abstract class Loader { * @throws Exception */ public abstract void load(ImmutableContext context, - Object connectionConfiguration, - Object jobConfiguration, + ConnectionConfiguration connectionConfiguration, + JobConfiguration jobConfiguration, DataReader reader) throws Exception; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/53688b54/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java index 9cd000c..bfb4bf2 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Partitioner.java @@ -25,11 +25,11 @@ import java.util.List; * This allows connector to define how input data to be partitioned. * The number of data partitions also determines the degree of parallelism. */ -public abstract class Partitioner { +public abstract class Partitioner<ConnectionConfiguration, JobConfiguration> { public abstract List<Partition> getPartitions(ImmutableContext context, long maxPartitions, - Object connectionConfiguration, - Object jobConfiguration); + ConnectionConfiguration connectionConfiguration, + JobConfiguration jobConfiguration); }
