Updated Branches: refs/heads/sqoop2 878a7b8ae -> 28a3c007a
SQOOP-714 Make map reduce code compatible with Hadoop 1 (Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/28a3c007 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/28a3c007 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/28a3c007 Branch: refs/heads/sqoop2 Commit: 28a3c007a7fb66414aea7e611486a3dd1db0f24b Parents: 5fcf712 Author: Bilung Lee <[email protected]> Authored: Mon Nov 19 15:34:22 2012 -0800 Committer: Bilung Lee <[email protected]> Committed: Mon Nov 19 15:34:22 2012 -0800 ---------------------------------------------------------------------- .../java/org/apache/sqoop/job/JobConstants.java | 10 +++++++ .../sqoop/job/etl/HdfsSequenceImportLoader.java | 15 +++------- .../apache/sqoop/job/mr/SqoopFileOutputFormat.java | 2 +- .../test/java/org/apache/sqoop/job/JobUtils.java | 5 +-- .../java/org/apache/sqoop/job/TestHdfsLoad.java | 22 +++++++-------- .../java/org/apache/sqoop/job/TestMapReduce.java | 2 +- .../mapreduce/MapreduceSubmissionEngine.java | 5 ++- 7 files changed, 32 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/28a3c007/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index 58b2a42..d899fce 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -76,6 +76,16 @@ public final class JobConstants extends Constants { public static final String PREFIX_CONNECTOR_CONTEXT = PREFIX_JOB_CONFIG + "connector.context."; + // Hadoop specific constants + // We're using constants from Hadoop 1. Hadoop 2 has different names, but + // provides backward compatibility layer for those names as well. + + public static final String HADOOP_OUTDIR = "mapred.output.dir"; + + public static final String HADOOP_COMPRESS = "mapred.output.compress"; + + public static final String HADOOP_COMPRESS_CODEC = + "mapred.output.compression.codec"; private JobConstants() { // Disable explicit object creation http://git-wip-us.apache.org/repos/asf/sqoop/blob/28a3c007/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java index 7c0ef08..a706ea8 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceImportLoader.java @@ -78,17 +78,12 @@ public class HdfsSequenceImportLoader extends Loader { Path filepath = new Path(filename); SequenceFile.Writer filewriter; if (codec != null) { - filewriter = SequenceFile.createWriter(conf, - SequenceFile.Writer.file(filepath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(NullWritable.class), - SequenceFile.Writer.compression(CompressionType.BLOCK, codec)); + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, + CompressionType.BLOCK, codec); } else { - filewriter = SequenceFile.createWriter(conf, - SequenceFile.Writer.file(filepath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(NullWritable.class), - SequenceFile.Writer.compression(CompressionType.NONE)); + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); } String csv; http://git-wip-us.apache.org/repos/asf/sqoop/blob/28a3c007/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java index c465f10..c221cbf 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/mr/SqoopFileOutputFormat.java @@ -57,7 +57,7 @@ public class SqoopFileOutputFormat boolean isCompressed = getCompressOutput(context); if (isCompressed) { String codecname = - conf.get(FileOutputFormat.COMPRESS_CODEC, DEFAULT_CODEC.getName()); + conf.get(JobConstants.HADOOP_COMPRESS_CODEC, DEFAULT_CODEC.getName()); conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/28a3c007/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java index e6ead3f..e21f15b 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/JobUtils.java @@ -27,7 +27,6 @@ import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.sqoop.job.io.Data; import org.apache.sqoop.job.mr.SqoopFileOutputFormat; import org.apache.sqoop.job.mr.SqoopInputFormat; @@ -40,7 +39,7 @@ public class JobUtils { public static void runJob(Configuration conf) throws IOException, InterruptedException, ClassNotFoundException { runJob(conf, SqoopInputFormat.class, SqoopMapper.class, - (conf.get(FileOutputFormat.OUTDIR) != null) ? + (conf.get(JobConstants.HADOOP_OUTDIR) != null) ? SqoopFileOutputFormat.class : SqoopNullOutputFormat.class); } @@ -49,7 +48,7 @@ public class JobUtils { Class<? extends Mapper<SqoopSplit, NullWritable, Data, NullWritable>> mapper, Class<? extends OutputFormat<Data, NullWritable>> output) throws IOException, InterruptedException, ClassNotFoundException { - Job job = Job.getInstance(conf); + Job job = new Job(conf); job.setInputFormatClass(input); job.setMapperClass(mapper); job.setMapOutputKeyClass(Data.class); http://git-wip-us.apache.org/repos/asf/sqoop/blob/28a3c007/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java index 2287b06..21a2be9 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsLoad.java @@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.ReflectionUtils; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.job.etl.Extractor; @@ -67,7 +66,7 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); - conf.set(FileOutputFormat.OUTDIR, outdir); + conf.set(JobConstants.HADOOP_OUTDIR, outdir); JobUtils.runJob(conf); String fileName = outdir + "/" + OUTPUT_FILE; @@ -85,12 +84,12 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsTextImportLoader.class.getName()); - conf.set(FileOutputFormat.OUTDIR, outdir); - conf.setBoolean(FileOutputFormat.COMPRESS, true); + conf.set(JobConstants.HADOOP_OUTDIR, outdir); + conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); JobUtils.runJob(conf); Class<? extends CompressionCodec> codecClass = conf.getClass( - FileOutputFormat.COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) + JobConstants.HADOOP_COMPRESS_CODEC, SqoopFileOutputFormat.DEFAULT_CODEC) .asSubclass(CompressionCodec.class); CompressionCodec codec = ReflectionUtils.newInstance(codecClass, conf); String fileName = outdir + "/" + OUTPUT_FILE + codec.getDefaultExtension(); @@ -128,13 +127,13 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); - conf.set(FileOutputFormat.OUTDIR, outdir); + conf.set(JobConstants.HADOOP_OUTDIR, outdir); JobUtils.runJob(conf); Path filepath = new Path(outdir, OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); - SequenceFile.Reader filereader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(filepath)); + SequenceFile.Reader filereader = new SequenceFile.Reader( + filepath.getFileSystem(conf), filepath, conf); verifyOutputSequence(filereader); } @@ -146,14 +145,13 @@ public class TestHdfsLoad extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, DummyExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, HdfsSequenceImportLoader.class.getName()); - conf.set(FileOutputFormat.OUTDIR, outdir); - conf.setBoolean(FileOutputFormat.COMPRESS, true); + conf.set(JobConstants.HADOOP_OUTDIR, outdir); + conf.setBoolean(JobConstants.HADOOP_COMPRESS, true); JobUtils.runJob(conf); Path filepath = new Path(outdir, OUTPUT_FILE + HdfsSequenceImportLoader.EXTENSION); - SequenceFile.Reader filereader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(filepath)); + SequenceFile.Reader filereader = new SequenceFile.Reader(filepath.getFileSystem(conf), filepath, conf); verifyOutputSequence(filereader); } http://git-wip-us.apache.org/repos/asf/sqoop/blob/28a3c007/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java index 3e498ec..745a3a4 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestMapReduce.java @@ -57,7 +57,7 @@ public class TestMapReduce extends TestCase { Configuration conf = new Configuration(); conf.set(JobConstants.JOB_TYPE, "IMPORT"); conf.set(JobConstants.JOB_ETL_PARTITIONER, DummyPartitioner.class.getName()); - Job job = Job.getInstance(conf); + Job job = new Job(conf); SqoopInputFormat inputformat = new SqoopInputFormat(); List<InputSplit> splits = inputformat.getSplits(job); http://git-wip-us.apache.org/repos/asf/sqoop/blob/28a3c007/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 48dc073..68f21fd 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -20,6 +20,7 @@ package org.apache.sqoop.submission.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.JobStatus; import org.apache.hadoop.mapred.RunningJob; @@ -100,7 +101,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { // Create job client try { - jobClient = new JobClient(new Configuration(globalConfiguration)); + jobClient = new JobClient(new JobConf(globalConfiguration)); } catch (IOException e) { throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0002, e); } @@ -183,7 +184,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { configuration.set("tmpjars", sb.toString()); try { - Job job = Job.getInstance(configuration); + Job job = new Job(configuration); if(request.getJobName() != null) { job.setJobName("Sqoop: " + request.getJobName());
