SQOOP-842: Put partition to template in Extractor as well (Jarcec Cecho via Cheolsoo Park)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/92062d53 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/92062d53 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/92062d53 Branch: refs/heads/sqoop2 Commit: 92062d5343e6318fe703b06e30c52920938818b0 Parents: 03408d5 Author: Cheolsoo Park <[email protected]> Authored: Wed Jan 30 15:13:18 2013 -0800 Committer: Cheolsoo Park <[email protected]> Committed: Wed Jan 30 15:13:18 2013 -0800 ---------------------------------------------------------------------- .../connector/jdbc/GenericJdbcImportExtractor.java | 6 +++--- .../sqoop/job/etl/HdfsSequenceExportExtractor.java | 15 ++++++++------- .../sqoop/job/etl/HdfsTextExportExtractor.java | 8 +++++--- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 2 +- .../java/org/apache/sqoop/job/TestMapReduce.java | 2 +- .../java/org/apache/sqoop/job/etl/Extractor.java | 6 +++--- 6 files changed, 21 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java ---------------------------------------------------------------------- diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java index f4389a3..9db3328 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcImportExtractor.java @@ -30,13 +30,13 @@ import org.apache.sqoop.job.etl.Partition; import org.apache.sqoop.job.etl.Extractor; import org.apache.sqoop.job.io.DataWriter; -public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration> { +public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguration, ImportJobConfiguration, GenericJdbcImportPartition> { public static final Logger LOG = Logger.getLogger(GenericJdbcImportExtractor.class); private long rowsRead = 0; @Override - public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, ConnectionConfiguration connection, ImportJobConfiguration job, GenericJdbcImportPartition partition, DataWriter writer) { String driver = connection.connection.jdbcDriver; String url = connection.connection.connectionString; String username = connection.connection.username; @@ -44,7 +44,7 @@ public class GenericJdbcImportExtractor extends Extractor<ConnectionConfiguratio GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); String query = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_DATA_SQL); - String conditions = ((GenericJdbcImportPartition)partition).getConditions(); + String conditions = partition.getConditions(); query = query.replace(GenericJdbcConnectorConstants.SQL_CONDITIONS_TOKEN, conditions); LOG.info("Using query: " + query); http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java index 3a04e59..45b6166 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java @@ -27,12 +27,14 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.framework.configuration.ConnectionConfiguration; +import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.DataWriter; -public class HdfsSequenceExportExtractor extends Extractor { +public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { public static final Log LOG = LogFactory.getLog(HdfsSequenceExportExtractor.class.getName()); @@ -47,19 +49,18 @@ public class HdfsSequenceExportExtractor extends Extractor { } @Override - public void run(ImmutableContext context, Object connectionConfiguration, - Object jobConfiguration, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration, + ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) { writer.setFieldDelimiter(fieldDelimiter); conf = ((PrefixContext)context).getConfiguration(); datawriter = writer; try { - HdfsExportPartition p = (HdfsExportPartition)partition; - LOG.info("Working on partition: " + p); - int numFiles = p.getNumberOfFiles(); + LOG.info("Working on partition: " + partition); + int numFiles = partition.getNumberOfFiles(); for (int i=0; i<numFiles; i++) { - extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); + extractFile(partition.getFile(i), partition.getOffset(i), partition.getLength(i)); } } catch (IOException e) { throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java index e00d428..ed30c91 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java @@ -33,12 +33,14 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.util.LineReader; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.framework.configuration.ConnectionConfiguration; +import org.apache.sqoop.framework.configuration.ExportJobConfiguration; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.PrefixContext; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.io.DataWriter; -public class HdfsTextExportExtractor extends Extractor { +public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { public static final Log LOG = LogFactory.getLog(HdfsTextExportExtractor.class.getName()); @@ -53,8 +55,8 @@ public class HdfsTextExportExtractor extends Extractor { } @Override - public void run(ImmutableContext context, Object connectionConfiguration, - Object jobConfiguration, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration, + ExportJobConfiguration jobConfiguration, HdfsExportPartition partition, DataWriter writer) { writer.setFieldDelimiter(fieldDelimiter); conf = ((PrefixContext)context).getConfiguration(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 4e6209d..6e1c958 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -217,7 +217,7 @@ public class TestHdfsLoad extends TestCase { public static class DummyExtractor extends Extractor { @Override - public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_ID; row++) { Object[] array = new Object[] { http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 8590065..427132e 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -133,7 +133,7 @@ public class TestMapReduce extends TestCase { public static class DummyExtractor extends Extractor { @Override - public void run(ImmutableContext context, Object oc, Object oj, Partition partition, DataWriter writer) { + public void run(ImmutableContext context, Object oc, Object oj, Object partition, DataWriter writer) { int id = ((DummyPartition)partition).getId(); for (int row = 0; row < NUMBER_OF_ROWS_PER_PARTITION; row++) { writer.writeArrayRecord(new Object[] { http://git-wip-us.apache.org/repos/asf/sqoop/blob/92062d53/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java index fac6f05..300cf4e 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Extractor.java @@ -24,7 +24,7 @@ import org.apache.sqoop.job.io.DataWriter; * This allows connector to extract data from a source system * based on each partition. */ -public abstract class Extractor<ConnectionConfiguration, JobConfiguration> { +public abstract class Extractor<ConnectionConfiguration, JobConfiguration, Partition> { public abstract void run(ImmutableContext context, ConnectionConfiguration connectionConfiguration, @@ -34,14 +34,14 @@ public abstract class Extractor<ConnectionConfiguration, JobConfiguration> { /** * Return the number of rows read by the last call to - * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) } + * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) } * method. This method returns only the number of rows read in the last call, * and not a cumulative total of the number of rows read by this Extractor * since its creation. If no calls were made to the run method, this method's * behavior is undefined. * * @return the number of rows read by the last call to - * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, org.apache.sqoop.job.etl.Partition, org.apache.sqoop.job.io.DataWriter) } + * {@linkplain Extractor#run(org.apache.sqoop.common.ImmutableContext, java.lang.Object, java.lang.Object, Partition, org.apache.sqoop.job.io.DataWriter) } */ public abstract long getRowsRead();
