Repository: sqoop Updated Branches: refs/heads/sqoop2 f43878bc7 -> 21d887636
SQOOP-1665: Sqoop2: Misc Cleanup / rename lingering connection to link (Veena Basavaraj via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/21d88763 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/21d88763 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/21d88763 Branch: refs/heads/sqoop2 Commit: 21d887636d21375a71a54b816fb6b795e977bd49 Parents: f43878b Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Nov 4 08:10:22 2014 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Nov 4 08:10:22 2014 -0800 ---------------------------------------------------------------------- .../sqoop/job/mr/MRConfigurationUtils.java | 4 +- .../sqoop/job/mr/SqoopDestroyerExecutor.java | 2 +- .../apache/sqoop/job/mr/SqoopInputFormat.java | 2 +- .../org/apache/sqoop/job/mr/SqoopMapper.java | 2 +- .../sqoop/job/mr/SqoopNullOutputFormat.java | 42 ++++++++++---------- .../job/mr/SqoopOutputFormatLoadExecutor.java | 2 +- .../sqoop/job/mr/TestMRConfigurationUtils.java | 4 +- .../org/apache/sqoop/job/etl/Destroyer.java | 6 +-- 8 files changed, 32 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java index 03a1dec..d5f74f0 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/MRConfigurationUtils.java @@ -153,11 +153,11 @@ public final class MRConfigurationUtils { } /** - * Retrieve Connector configuration object for connection. + * Retrieve Connector configuration object for link. * @param configuration MapReduce configuration object * @return Configuration object */ - public static Object getConnectorConnectionConfig(Direction type, Configuration configuration) { + public static Object getConnectorLinkConfig(Direction type, Configuration configuration) { switch (type) { case FROM: return loadConfiguration((JobConf) configuration, MR_JOB_CONFIG_CLASS_FROM_CONNECTOR_LINK, MR_JOB_CONFIG_FROM_CONNECTOR_LINK_KEY); http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java index 32b5b1d..c6ba749 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopDestroyerExecutor.java @@ -68,7 +68,7 @@ public class SqoopDestroyerExecutor { // Objects that should be pass to the Destroyer execution PrefixContext subContext = new PrefixContext(configuration, prefixPropertyName); - Object configConnection = MRConfigurationUtils.getConnectorConnectionConfig(direction, configuration); + Object configConnection = MRConfigurationUtils.getConnectorLinkConfig(direction, configuration); Object configJob = MRConfigurationUtils.getConnectorJobConfig(direction, configuration); // Propagate connector schema in every case for now http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java index d2cf5e4..887b4bb 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopInputFormat.java @@ -63,7 +63,7 @@ public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object connectorConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object connectorConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object connectorJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Schema schema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java index d31aa20..e25f404 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopMapper.java @@ -80,7 +80,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, // Objects that should be passed to the Executor execution PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); - Object fromConfig = MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf); + Object fromConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); SqoopSplit split = context.getCurrentKey(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java index 1148c4a..6134106 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopNullOutputFormat.java @@ -18,6 +18,8 @@ package org.apache.sqoop.job.mr; +import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.JobContext; @@ -28,18 +30,14 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.log4j.Logger; import org.apache.sqoop.common.Direction; -import org.apache.sqoop.job.MRJobConstants; import org.apache.sqoop.job.io.SqoopWritable; -import java.io.IOException; - /** * An output format for MapReduce job. */ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> { - public static final Logger LOG = - Logger.getLogger(SqoopNullOutputFormat.class); + public static final Logger LOG = Logger.getLogger(SqoopNullOutputFormat.class); @Override public void checkOutputSpecs(JobContext context) { @@ -47,48 +45,50 @@ public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWrita } @Override - public RecordWriter<SqoopWritable, NullWritable> getRecordWriter( - TaskAttemptContext context) { - SqoopOutputFormatLoadExecutor executor = - new SqoopOutputFormatLoadExecutor(context); + public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) { + SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context); return executor.getRecordWriter(); } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) { - return new DestroyerOutputCommitter(); + return new SqoopDestroyerOutputCommitter(); } - class DestroyerOutputCommitter extends OutputCommitter { + class SqoopDestroyerOutputCommitter extends OutputCommitter { @Override - public void setupJob(JobContext jobContext) { } + public void setupJob(JobContext jobContext) { + } @Override public void commitJob(JobContext jobContext) throws IOException { super.commitJob(jobContext); - - Configuration config = jobContext.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.FROM); - SqoopDestroyerExecutor.executeDestroyer(true, config, Direction.TO); + invokeDestroyerExecutor(jobContext, true); } @Override public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { super.abortJob(jobContext, state); + invokeDestroyerExecutor(jobContext, false); + } + private void invokeDestroyerExecutor(JobContext jobContext, boolean success) { Configuration config = jobContext.getConfiguration(); - SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.FROM); - SqoopDestroyerExecutor.executeDestroyer(false, config, Direction.TO); + SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.FROM); + SqoopDestroyerExecutor.executeDestroyer(success, config, Direction.TO); } @Override - public void setupTask(TaskAttemptContext taskContext) { } + public void setupTask(TaskAttemptContext taskContext) { + } @Override - public void commitTask(TaskAttemptContext taskContext) { } + public void commitTask(TaskAttemptContext taskContext) { + } @Override - public void abortTask(TaskAttemptContext taskContext) { } + public void abortTask(TaskAttemptContext taskContext) { + } @Override public boolean needsTaskCommit(TaskAttemptContext taskContext) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 8aad936..579101e 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -236,7 +236,7 @@ public class SqoopOutputFormatLoadExecutor { schema = matcher.getToSchema(); subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); - configConnection = MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, conf); + configConnection = MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, conf); configJob = MRConfigurationUtils.getConnectorJobConfig(Direction.TO, conf); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java index 972b555..fbe3e7b 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestMRConfigurationUtils.java @@ -63,11 +63,11 @@ public class TestMRConfigurationUtils { public void testLinkConfiguration() throws Exception { MRConfigurationUtils.setConnectorLinkConfig(Direction.FROM, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, jobConfSpy)); MRConfigurationUtils.setConnectorLinkConfig(Direction.TO, job, getConfig()); setUpHadoopJobConf(); - assertEquals(getConfig(), MRConfigurationUtils.getConnectorConnectionConfig(Direction.TO, jobConfSpy)); + assertEquals(getConfig(), MRConfigurationUtils.getConnectorLinkConfig(Direction.TO, jobConfSpy)); } @Test http://git-wip-us.apache.org/repos/asf/sqoop/blob/21d88763/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java ---------------------------------------------------------------------- diff --git a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java index e2d98ca..8486154 100644 --- a/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java +++ b/spi/src/main/java/org/apache/sqoop/job/etl/Destroyer.java @@ -24,13 +24,13 @@ package org.apache.sqoop.job.etl; public abstract class Destroyer<LinkConfiguration, JobConfiguration> { /** - * Callback to clean up after job execution. + * Callback to clean up after job execution * * @param context Destroyer context * @param linkConfiguration link configuration object * @param jobConfiguration job configuration object for the FROM and TO - * In case of the FROM initializer this will represent the FROM job configuration - * In case of the TO initializer this will represent the TO job configuration + * In case of the FROM destroyer this will represent the FROM job configuration + * In case of the TO destroyer this will represent the TO job configuration */ public abstract void destroy(DestroyerContext context, LinkConfiguration linkConfiguration,
