SQOOP-789: Fix Hadoop-1 build (Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/72f6c2fa Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/72f6c2fa Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/72f6c2fa Branch: refs/heads/branch-1.99.1 Commit: 72f6c2fa64f4b9eca3b787a8b8db6a48f0b7810e Parents: cf5b608 Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Dec 18 00:38:58 2012 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Dec 18 18:02:34 2012 -0800 ---------------------------------------------------------------------- .../job/mr/SqoopOutputFormatLoadExecutor.java | 52 ++-- .../job/mr/TestSqoopOutputFormatLoadExecutor.java | 242 +-------------- 2 files changed, 41 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/72f6c2fa/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java index 9714167..d158327 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopOutputFormatLoadExecutor.java @@ -53,6 +53,15 @@ public class SqoopOutputFormatLoadExecutor { private Future<?> consumerFuture; private Semaphore filled = new Semaphore(0, true); private Semaphore free = new Semaphore(1, true); + private volatile boolean isTest = false; + private String loaderName; + + SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){ + this.isTest = isTest; + this.loaderName = loaderName; + data = new Data(); + producer = new SqoopRecordWriter(); + } public SqoopOutputFormatLoadExecutor(JobContext jobctx) { data = new Data(); @@ -168,10 +177,13 @@ public class SqoopOutputFormatLoadExecutor { public void run() { DataReader reader = new OutputFormatDataReader(); - Configuration conf = context.getConfiguration(); + Configuration conf = null; + if (!isTest) { + conf = context.getConfiguration(); - String loaderName = conf.get(JobConstants.JOB_ETL_LOADER); + loaderName = conf.get(JobConstants.JOB_ETL_LOADER); + } Loader loader = (Loader) ClassUtils.instantiate(loaderName); // Objects that should be pass to the Executor execution @@ -179,23 +191,25 @@ public class SqoopOutputFormatLoadExecutor { Object configConnection = null; Object configJob = null; - switch (ConfigurationUtils.getJobType(conf)) { - case EXPORT: - subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); - configConnection = ConfigurationUtils.getConnectorConnection(conf); - configJob = ConfigurationUtils.getConnectorJob(conf); - break; - case IMPORT: - subContext = new PrefixContext(conf, ""); - configConnection = ConfigurationUtils.getFrameworkConnection(conf); - configJob = ConfigurationUtils.getFrameworkJob(conf); - break; - default: - readerFinished = true; - // Release so that the writer can tell the framework something went - // wrong. - free.release(); - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); + if (!isTest) { + switch (ConfigurationUtils.getJobType(conf)) { + case EXPORT: + subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_CONTEXT); + configConnection = ConfigurationUtils.getConnectorConnection(conf); + configJob = ConfigurationUtils.getConnectorJob(conf); + break; + case IMPORT: + subContext = new PrefixContext(conf, ""); + configConnection = ConfigurationUtils.getFrameworkConnection(conf); + configJob = ConfigurationUtils.getFrameworkJob(conf); + break; + default: + readerFinished = true; + // Release so that the writer can tell the framework something went + // wrong. + free.release(); + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0023); + } } try { http://git-wip-us.apache.org/repos/asf/sqoop/blob/72f6c2fa/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java index 4234adf..c99b2c0 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/mr/TestSqoopOutputFormatLoadExecutor.java @@ -20,11 +20,8 @@ package org.apache.sqoop.job.mr; import junit.framework.Assert; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.mapreduce.*; -import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.job.JobConstants; @@ -34,8 +31,6 @@ import org.apache.sqoop.job.io.DataReader; import org.junit.Before; import org.junit.Test; -import java.io.IOException; -import java.net.URI; import java.util.ConcurrentModificationException; import java.util.concurrent.BrokenBarrierException; @@ -135,7 +130,7 @@ public class TestSqoopOutputFormatLoadExecutor { conf.set(JobConstants.JOB_TYPE, "EXPORT"); conf.set(JobConstants.JOB_ETL_LOADER, ThrowingLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(getJobContext()); + SqoopOutputFormatLoadExecutor(true, ThrowingLoader.class.getName()); RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); Data data = new Data(); try { @@ -153,7 +148,7 @@ public class TestSqoopOutputFormatLoadExecutor { conf.set(JobConstants.JOB_TYPE, "EXPORT"); conf.set(JobConstants.JOB_ETL_LOADER, GoodContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(getJobContext()); + SqoopOutputFormatLoadExecutor(true, GoodContinuousLoader.class.getName()); RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); Data data = new Data(); for (int i = 0; i < 10; i++) { @@ -167,7 +162,7 @@ public class TestSqoopOutputFormatLoadExecutor { data.setContent(builder.toString(), Data.CSV_RECORD); writer.write(data, null); } - writer.close(getJobContext()); + writer.close(null); } @Test @@ -175,7 +170,7 @@ public class TestSqoopOutputFormatLoadExecutor { conf.set(JobConstants.JOB_TYPE, "EXPORT"); conf.set(JobConstants.JOB_ETL_LOADER, GoodLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(getJobContext()); + SqoopOutputFormatLoadExecutor(true, GoodLoader.class.getName()); RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); Data data = new Data(); StringBuilder builder = new StringBuilder(); @@ -187,7 +182,7 @@ public class TestSqoopOutputFormatLoadExecutor { } data.setContent(builder.toString(), Data.CSV_RECORD); writer.write(data, null); - writer.close(getJobContext()); + writer.close(null); } @@ -196,7 +191,7 @@ public class TestSqoopOutputFormatLoadExecutor { conf.set(JobConstants.JOB_TYPE, "EXPORT"); conf.set(JobConstants.JOB_ETL_LOADER, ThrowingContinuousLoader.class.getName()); SqoopOutputFormatLoadExecutor executor = new - SqoopOutputFormatLoadExecutor(getJobContext()); + SqoopOutputFormatLoadExecutor(true, ThrowingContinuousLoader.class.getName()); RecordWriter<Data, NullWritable> writer = executor.getRecordWriter(); Data data = new Data(); try { @@ -211,230 +206,9 @@ public class TestSqoopOutputFormatLoadExecutor { data.setContent(builder.toString(), Data.CSV_RECORD); writer.write(data, null); } - writer.close(getJobContext()); + writer.close(null); } catch (SqoopException ex) { throw ex.getCause(); } } - - - private TaskAttemptContext getJobContext() { - TaskAttemptContext context = new TaskAttemptContext() { - @Override - public Configuration getConfiguration() { - return conf; - } - - @Override - public Credentials getCredentials() { - return null; - } - - @Override - public JobID getJobID() { - return null; - } - - @Override - public int getNumReduceTasks() { - return 0; - } - - @Override - public Path getWorkingDirectory() throws IOException { - return null; - } - - @Override - public Class<?> getOutputKeyClass() { - return null; - } - - @Override - public Class<?> getOutputValueClass() { - return null; - } - - @Override - public Class<?> getMapOutputKeyClass() { - return null; - } - - @Override - public Class<?> getMapOutputValueClass() { - return null; - } - - @Override - public String getJobName() { - return null; - } - - @Override - public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException { - return null; - } - - @Override - public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass() throws ClassNotFoundException { - return null; - } - - @Override - public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass() throws ClassNotFoundException { - return null; - } - - @Override - public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass() throws ClassNotFoundException { - return null; - } - - @Override - public Class<? extends OutputFormat<?, ?>> getOutputFormatClass() throws ClassNotFoundException { - return null; - } - - @Override - public Class<? extends Partitioner<?, ?>> getPartitionerClass() throws ClassNotFoundException { - return null; - } - - @Override - public RawComparator<?> getSortComparator() { - return null; - } - - @Override - public String getJar() { - return null; - } - - @Override - public RawComparator<?> getGroupingComparator() { - return null; - } - - @Override - public boolean getJobSetupCleanupNeeded() { - return false; - } - - @Override - public boolean getTaskCleanupNeeded() { - return false; - } - - @Override - public boolean getProfileEnabled() { - return false; - } - - @Override - public String getProfileParams() { - return null; - } - - @Override - public Configuration.IntegerRanges getProfileTaskRange(boolean isMap) { - return null; - } - - @Override - public String getUser() { - return null; - } - - @Override - public boolean getSymlink() { - return false; - } - - @Override - public Path[] getArchiveClassPaths() { - return new Path[0]; - } - - @Override - public URI[] getCacheArchives() throws IOException { - return new URI[0]; - } - - @Override - public URI[] getCacheFiles() throws IOException { - return new URI[0]; - } - - @Override - public Path[] getLocalCacheArchives() throws IOException { - return new Path[0]; - } - - @Override - public Path[] getLocalCacheFiles() throws IOException { - return new Path[0]; - } - - @Override - public Path[] getFileClassPaths() { - return new Path[0]; - } - - @Override - public String[] getArchiveTimestamps() { - return new String[0]; - } - - @Override - public String[] getFileTimestamps() { - return new String[0]; - } - - @Override - public int getMaxMapAttempts() { - return 0; - } - - @Override - public int getMaxReduceAttempts() { - return 0; - } - - @Override - public TaskAttemptID getTaskAttemptID() { - return null; - } - - @Override - public void setStatus(String msg) { - - } - - @Override - public String getStatus() { - return null; - } - - @Override - public float getProgress() { - return 0; - } - - @Override - public Counter getCounter(Enum<?> counterName) { - return null; - } - - @Override - public Counter getCounter(String groupName, String counterName) { - return null; - } - - @Override - public void progress() { - - } - }; - return context; - } }
