Updated Branches: refs/heads/sqoop2 1c87cb762 -> 66a328aef
SQOOP-768 Compilation on hadoop profile 100 will fail (Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/66a328ae Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/66a328ae Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/66a328ae Branch: refs/heads/sqoop2 Commit: 66a328aef87ebed4d22ca7cc22d655360eedc44f Parents: 1c87cb7 Author: Bilung Lee <[email protected]> Authored: Fri Dec 14 11:52:14 2012 -0800 Committer: Bilung Lee <[email protected]> Committed: Fri Dec 14 11:52:14 2012 -0800 ---------------------------------------------------------------------- execution/mapreduce/pom.xml | 2 - .../mapreduce/MapreduceExecutionEngine.java | 22 ++++++++++- .../java/org/apache/sqoop/job/JobConstants.java | 2 + .../sqoop/job/etl/HdfsExportPartitioner.java | 16 ++++---- .../sqoop/job/etl/HdfsSequenceExportExtractor.java | 3 +- .../sqoop/job/etl/HdfsTextExportExtractor.java | 30 ++++----------- .../java/org/apache/sqoop/job/TestHdfsExtract.java | 25 +++++------- pom.xml | 1 - 8 files changed, 49 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/pom.xml ---------------------------------------------------------------------- diff --git a/execution/mapreduce/pom.xml b/execution/mapreduce/pom.xml index 9e1d2ec..a6299e1 100644 --- a/execution/mapreduce/pom.xml +++ b/execution/mapreduce/pom.xml @@ -83,8 +83,6 @@ limitations under the License. <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> - <version>${guava.version}</version> - <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index 06872ca..b201a8d 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -17,8 +17,8 @@ */ package org.apache.sqoop.execution.mapreduce; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.sqoop.common.MutableMapContext; import org.apache.sqoop.common.SqoopException; import org.apache.sqoop.framework.ExecutionEngine; @@ -61,6 +61,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine { MRSubmissionRequest request = (MRSubmissionRequest) gRequest; ImportJobConfiguration jobConf = (ImportJobConfiguration) request.getConfigFrameworkJob(); + // Add jar dependencies + addDependencies(request); + // Configure map-reduce classes for import request.setInputFormatClass(SqoopInputFormat.class); @@ -103,6 +106,9 @@ public class MapreduceExecutionEngine extends ExecutionEngine { MRSubmissionRequest request = (MRSubmissionRequest) gRequest; ExportJobConfiguration jobConf = (ExportJobConfiguration) request.getConfigFrameworkJob(); + // Add jar dependencies + addDependencies(request); + // Configure map-reduce classes for import request.setInputFormatClass(SqoopInputFormat.class); @@ -124,10 +130,22 @@ public class MapreduceExecutionEngine extends ExecutionEngine { // We should make one extractor that will be able to read all supported file types context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName()); - context.setString(FileInputFormat.INPUT_DIR, jobConf.input.inputDirectory); + context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory); if(request.getExtractors() != null) { context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors()); } } + + /** + * Our execution engine have additional dependencies that needs to be available + * at mapreduce job time. This method will register all dependencies in the request + * object. + * + * @param request Active request object. + */ + protected void addDependencies(MRSubmissionRequest request) { + // Guava + request.addJarForClass(ThreadFactoryBuilder.class); + } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java index f5123a2..e16a2c4 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/JobConstants.java @@ -83,6 +83,8 @@ public final class JobConstants extends Constants { // We're using constants from Hadoop 1. Hadoop 2 has different names, but // provides backward compatibility layer for those names as well. + public static final String HADOOP_INPUTDIR = "mapred.input.dir"; + public static final String HADOOP_OUTDIR = "mapred.output.dir"; public static final String HADOOP_COMPRESS = "mapred.output.compress"; http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java index 7ffd97c..71e0060 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportPartitioner.java @@ -36,7 +36,6 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NetworkTopology; @@ -113,12 +112,12 @@ public class HdfsExportPartitioner extends Partitioner { } // all the files in input set - String indir = conf.get(FileInputFormat.INPUT_DIR); + String indir = conf.get(JobConstants.HADOOP_INPUTDIR); FileSystem fs = FileSystem.get(conf); List<Path> paths = new LinkedList<Path>(); for(FileStatus status : fs.listStatus(new Path(indir))) { - if(!status.isDirectory()) { + if(!status.isDir()) { paths.add(status.getPath()); } } @@ -143,7 +142,7 @@ public class HdfsExportPartitioner extends Partitioner { } private long getInputSize(Configuration conf) throws IOException { - String indir = conf.get(FileInputFormat.INPUT_DIR); + String indir = conf.get(JobConstants.HADOOP_INPUTDIR); FileSystem fs = FileSystem.get(conf); FileStatus[] files = fs.listStatus(new Path(indir)); long count = 0; @@ -345,10 +344,11 @@ public class HdfsExportPartitioner extends Partitioner { private boolean isSplitable(Configuration conf, Path file) { final CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(file); - if (null == codec) { - return true; - } - return codec instanceof SplittableCompressionCodec; + + // This method might be improved for SplittableCompression codec when we + // drop support for Hadoop 1.0 + return null == codec; + } /** http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java index 0693a09..2261a7c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java @@ -74,8 +74,7 @@ public class HdfsSequenceExportExtractor extends Extractor { LOG.info("\t to offset " + end); LOG.info("\t of length " + length); - SequenceFile.Reader filereader = new SequenceFile.Reader(conf, - SequenceFile.Reader.file(file)); + SequenceFile.Reader filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); if (start > filereader.getPosition()) { filereader.sync(start); // sync to start http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java index c412c81..fdc7d67 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java @@ -30,8 +30,6 @@ import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.SplitCompressionInputStream; -import org.apache.hadoop.io.compress.SplittableCompressionCodec; import org.apache.hadoop.util.LineReader; import org.apache.sqoop.common.ImmutableContext; import org.apache.sqoop.common.SqoopException; @@ -93,32 +91,20 @@ public class HdfsTextExportExtractor extends Extractor { byte[] recordDelimiterBytes = String.valueOf( Data.DEFAULT_RECORD_DELIMITER).getBytes( Charset.forName(Data.CHARSET_NAME)); - filereader = new LineReader(filestream, conf, - recordDelimiterBytes); + // Hadoop 1.0 do not have support for custom record delimiter and thus we + // are supporting only default one. + filereader = new LineReader(filestream, conf); fileseeker = filestream; - - } else if (codec instanceof SplittableCompressionCodec) { - SplitCompressionInputStream compressionstream = - ((SplittableCompressionCodec)codec).createInputStream( - filestream, codec.createDecompressor(), start, end, - SplittableCompressionCodec.READ_MODE.BYBLOCK); - byte[] recordDelimiterBytes = String.valueOf( - Data.DEFAULT_RECORD_DELIMITER).getBytes( - Charset.forName(Data.CHARSET_NAME)); - filereader = new LineReader(compressionstream, - conf, recordDelimiterBytes); - fileseeker = compressionstream; - - start = compressionstream.getAdjustedStart(); - end = compressionstream.getAdjustedEnd(); - + // We might add another "else if" case for SplittableCompressionCodec once + // we drop support for Hadoop 1.0. } else { byte[] recordDelimiterBytes = String.valueOf( Data.DEFAULT_RECORD_DELIMITER).getBytes( Charset.forName(Data.CHARSET_NAME)); + // Hadoop 1.0 do not have support for custom record delimiter and thus we + // are supporting only default one. filereader = new LineReader( - codec.createInputStream(filestream, codec.createDecompressor()), - conf, recordDelimiterBytes); + codec.createInputStream(filestream, codec.createDecompressor()), conf); fileseeker = filestream; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index ba44de9..484eb20 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -71,7 +71,7 @@ public class TestHdfsExtract extends TestCase { HdfsTextExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -89,7 +89,7 @@ public class TestHdfsExtract extends TestCase { HdfsTextExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); FileUtils.delete(indir); @@ -102,7 +102,7 @@ public class TestHdfsExtract extends TestCase { HdfsTextExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -120,7 +120,7 @@ public class TestHdfsExtract extends TestCase { HdfsSequenceExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -138,7 +138,7 @@ public class TestHdfsExtract extends TestCase { HdfsSequenceExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); - conf.set(FileInputFormat.INPUT_DIR, indir); + conf.set(JobConstants.HADOOP_INPUTDIR, indir); JobUtils.runJob(conf); } @@ -198,17 +198,12 @@ public class TestHdfsExtract extends TestCase { "part-r-" + padZeros(fi, 5) + HdfsSequenceImportLoader.EXTENSION); SequenceFile.Writer filewriter; if (codec != null) { - filewriter = SequenceFile.createWriter(conf, - SequenceFile.Writer.file(filepath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(NullWritable.class), - SequenceFile.Writer.compression(CompressionType.BLOCK, codec)); + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, + CompressionType.BLOCK, codec); } else { - filewriter = SequenceFile.createWriter(conf, - SequenceFile.Writer.file(filepath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(NullWritable.class), - SequenceFile.Writer.compression(CompressionType.NONE)); + filewriter = SequenceFile.createWriter(filepath.getFileSystem(conf), + conf, filepath, Text.class, NullWritable.class, CompressionType.NONE); } Text text = new Text(); http://git-wip-us.apache.org/repos/asf/sqoop/blob/66a328ae/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2311934..be4f1b6 100644 --- a/pom.xml +++ b/pom.xml @@ -130,7 +130,6 @@ limitations under the License. <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> - <scope>provided</scope> </dependency> <dependency>
