Updated Branches: refs/heads/sqoop2 3b8e8d15d -> 3d9aaa0d0
SQOOP-783: Sqoop2: Merge HdfsSequenceExportExtractor and HdfsTextExportExtractor to one Extractor (Vasanth kumar RJ via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/3d9aaa0d Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/3d9aaa0d Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/3d9aaa0d Branch: refs/heads/sqoop2 Commit: 3d9aaa0d0d8f559798f667749ebd406d9a20af91 Parents: 3b8e8d1 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sun Mar 17 12:27:21 2013 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sun Mar 17 12:27:21 2013 -0700 ---------------------------------------------------------------------- .../mapreduce/MapreduceExecutionEngine.java | 6 +- .../apache/sqoop/job/etl/HdfsExportExtractor.java | 203 +++++++++++++++ .../sqoop/job/etl/HdfsSequenceExportExtractor.java | 101 ------- .../sqoop/job/etl/HdfsTextExportExtractor.java | 131 ---------- .../java/org/apache/sqoop/job/TestHdfsExtract.java | 13 +- 5 files changed, 212 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java index b201a8d..767080c 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/execution/mapreduce/MapreduceExecutionEngine.java @@ -29,9 +29,9 @@ import org.apache.sqoop.framework.configuration.OutputFormat; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.job.MapreduceExecutionError; import org.apache.sqoop.job.etl.Exporter; +import org.apache.sqoop.job.etl.HdfsExportExtractor; import org.apache.sqoop.job.etl.HdfsExportPartitioner; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -import org.apache.sqoop.job.etl.HdfsTextExportExtractor; import org.apache.sqoop.job.etl.HdfsTextImportLoader; import org.apache.sqoop.job.etl.Importer; import org.apache.sqoop.job.io.Data; @@ -128,8 +128,8 @@ public class MapreduceExecutionEngine extends ExecutionEngine { context.setString(JobConstants.JOB_ETL_LOADER, exporter.getLoader().getName()); context.setString(JobConstants.JOB_ETL_DESTROYER, exporter.getDestroyer().getName()); - // We should make one extractor that will be able to read all supported file types - context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsTextExportExtractor.class.getName()); + // Extractor that will be able to read all supported file types + context.setString(JobConstants.JOB_ETL_EXTRACTOR, HdfsExportExtractor.class.getName()); context.setString(JobConstants.HADOOP_INPUTDIR, jobConf.input.inputDirectory); if(request.getExtractors() != null) { http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java new file mode 100644 index 0000000..9281bb4 --- /dev/null +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsExportExtractor.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.sqoop.job.etl; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.util.LineReader; +import org.apache.sqoop.common.SqoopException; +import org.apache.sqoop.etl.io.DataWriter; +import org.apache.sqoop.framework.configuration.ConnectionConfiguration; +import org.apache.sqoop.framework.configuration.ExportJobConfiguration; +import org.apache.sqoop.job.MapreduceExecutionError; +import org.apache.sqoop.job.PrefixContext; +import org.apache.sqoop.job.io.Data; + +/** + * Extract from HDFS. + * Default field delimiter of a record is comma. + */ +public class HdfsExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { + + public static final Log LOG = LogFactory.getLog(HdfsExportExtractor.class.getName()); + + private Configuration conf; + private DataWriter dataWriter; + private long rowRead = 0; + + private final char fieldDelimiter; + + public HdfsExportExtractor() { + fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; + } + + @Override + public void extract(ExtractorContext context, + ConnectionConfiguration connectionConfiguration, + ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) { + + conf = ((PrefixContext) context.getContext()).getConfiguration(); + dataWriter = context.getDataWriter(); + dataWriter.setFieldDelimiter(fieldDelimiter); + + try { + HdfsExportPartition p = partition; + LOG.info("Working on partition: " + p); + int numFiles = p.getNumberOfFiles(); + for (int i = 0; i < numFiles; i++) { + extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); + } + } catch (IOException e) { + throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); + } + } + + private void extractFile(Path file, long start, long length) + throws IOException { + long end = start + length; + LOG.info("Extracting file " + file); + LOG.info("\t from offset " + start); + LOG.info("\t to offset " + end); + LOG.info("\t of length " + length); + if(isSequenceFile(file)) { + extractSequenceFile(file, start, length); + } else { + extractTextFile(file, start, length); + } + } + + /** + * Extracts Sequence file + * @param file + * @param start + * @param length + * @throws IOException + */ + private void extractSequenceFile(Path file, long start, long length) + throws IOException { + LOG.info("Extracting sequence file"); + long end = start + length; + SequenceFile.Reader filereader = new SequenceFile.Reader( + file.getFileSystem(conf), file, conf); + + if (start > filereader.getPosition()) { + filereader.sync(start); // sync to start + } + + Text line = new Text(); + boolean hasNext = filereader.next(line); + while (hasNext) { + rowRead++; + dataWriter.writeCsvRecord(line.toString()); + line = new Text(); + hasNext = filereader.next(line); + if (filereader.getPosition() >= end && filereader.syncSeen()) { + break; + } + } + filereader.close(); + } + + /** + * Extracts Text file + * @param file + * @param start + * @param length + * @throws IOException + */ + private void extractTextFile(Path file, long start, long length) + throws IOException { + LOG.info("Extracting text file"); + long end = start + length; + FileSystem fs = file.getFileSystem(conf); + FSDataInputStream filestream = fs.open(file); + CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); + LineReader filereader; + Seekable fileseeker = filestream; + + // Hadoop 1.0 does not have support for custom record delimiter and thus + // we + // are supporting only default one. + // We might add another "else if" case for SplittableCompressionCodec once + // we drop support for Hadoop 1.0. + if (codec == null) { + filestream.seek(start); + filereader = new LineReader(filestream); + } else { + filereader = new LineReader(codec.createInputStream(filestream, + codec.createDecompressor()), conf); + fileseeker = filestream; + } + if (start != 0) { + // always throw away first record because + // one extra line is read in previous split + start += filereader.readLine(new Text(), 0); + } + int size; + LOG.info("Start position: " + String.valueOf(start)); + long next = start; + while (next <= end) { + Text line = new Text(); + size = filereader.readLine(line, Integer.MAX_VALUE); + if (size == 0) { + break; + } + if (codec == null) { + next += size; + } else { + next = fileseeker.getPos(); + } + rowRead++; + dataWriter.writeCsvRecord(line.toString()); + } + LOG.info("Extracting ended on position: " + fileseeker.getPos()); + filestream.close(); + } + + @Override + public long getRowsRead() { + return rowRead; + } + + /** + * Returns true if given file is sequence + * @param file + * @return boolean + */ + private boolean isSequenceFile(Path file) { + SequenceFile.Reader filereader = null; + try { + filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); + filereader.close(); + } catch (IOException e) { + return false; + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java deleted file mode 100644 index 2280828..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.etl.io.DataWriter; - -public class HdfsSequenceExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { - - public static final Log LOG = - LogFactory.getLog(HdfsSequenceExportExtractor.class.getName()); - - private Configuration conf; - private DataWriter dataWriter; - - private final char fieldDelimiter; - - public HdfsSequenceExportExtractor() { - fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; - } - - @Override - public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration, - ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) { - - conf = ((PrefixContext)context.getContext()).getConfiguration(); - dataWriter = context.getDataWriter(); - dataWriter.setFieldDelimiter(fieldDelimiter); - - try { - LOG.info("Working on partition: " + partition); - int numFiles = partition.getNumberOfFiles(); - for (int i=0; i<numFiles; i++) { - extractFile(partition.getFile(i), partition.getOffset(i), partition.getLength(i)); - } - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); - } - } - - private void extractFile(Path file, long start, long length) - throws IOException { - long end = start + length; - LOG.info("Extracting file " + file); - LOG.info("\t from offset " + start); - LOG.info("\t to offset " + end); - LOG.info("\t of length " + length); - - SequenceFile.Reader filereader = new SequenceFile.Reader(file.getFileSystem(conf), file, conf); - - if (start > filereader.getPosition()) { - filereader.sync(start); // sync to start - } - - Text line = new Text(); - boolean hasNext = filereader.next(line); - while (hasNext) { - dataWriter.writeCsvRecord(line.toString()); - line = new Text(); - hasNext = filereader.next(line); - if(filereader.getPosition() >= end && filereader.syncSeen()) { - break; - } - } - } - - @Override - public long getRowsRead() { - // TODO need to return the rows read - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java deleted file mode 100644 index ae419ff..0000000 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.sqoop.job.etl; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Seekable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.util.LineReader; -import org.apache.sqoop.common.SqoopException; -import org.apache.sqoop.framework.configuration.ConnectionConfiguration; -import org.apache.sqoop.framework.configuration.ExportJobConfiguration; -import org.apache.sqoop.job.MapreduceExecutionError; -import org.apache.sqoop.job.PrefixContext; -import org.apache.sqoop.job.io.Data; -import org.apache.sqoop.etl.io.DataWriter; - -public class HdfsTextExportExtractor extends Extractor<ConnectionConfiguration, ExportJobConfiguration, HdfsExportPartition> { - - public static final Log LOG = - LogFactory.getLog(HdfsTextExportExtractor.class.getName()); - - private Configuration conf; - private DataWriter dataWriter; - - private final char fieldDelimiter; - - public HdfsTextExportExtractor() { - fieldDelimiter = Data.DEFAULT_FIELD_DELIMITER; - } - - @Override - public void extract(ExtractorContext context, ConnectionConfiguration connectionConfiguration, - ExportJobConfiguration jobConfiguration, HdfsExportPartition partition) { - - conf = ((PrefixContext)context.getContext()).getConfiguration(); - dataWriter = context.getDataWriter(); - dataWriter.setFieldDelimiter(fieldDelimiter); - - try { - HdfsExportPartition p = partition; - LOG.info("Working on partition: " + p); - int numFiles = p.getNumberOfFiles(); - for (int i=0; i<numFiles; i++) { - extractFile(p.getFile(i), p.getOffset(i), p.getLength(i)); - } - } catch (IOException e) { - throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0017, e); - } - } - - private void extractFile(Path file, long start, long length) - throws IOException { - long end = start + length; - LOG.info("Extracting file " + file); - LOG.info("\t from offset " + start); - LOG.info("\t to offset " + end); - LOG.info("\t of length " + length); - - FileSystem fs = file.getFileSystem(conf); - FSDataInputStream filestream = fs.open(file); - CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); - LineReader filereader; - Seekable fileseeker = filestream; - - // Hadoop 1.0 does not have support for custom record delimiter and thus we - // are supporting only default one. - // We might add another "else if" case for SplittableCompressionCodec once - // we drop support for Hadoop 1.0. - if (codec == null) { - filestream.seek(start); - filereader = new LineReader(filestream); - } else { - filereader = new LineReader( - codec.createInputStream(filestream, codec.createDecompressor()), conf); - fileseeker = filestream; - } - - if (start != 0) { - // always throw away first record because - // one extra line is read in previous split - start += filereader.readLine(new Text(), 0); - } - int size; - LOG.info("Start position: " + String.valueOf(start)); - long next = start; - while (next <= end) { - Text line = new Text(); - size = filereader.readLine(line, Integer.MAX_VALUE); - if (size == 0) { - break; - } - if (codec == null) { - next += size; - } else { - next = fileseeker.getPos(); - } - dataWriter.writeCsvRecord(line.toString()); - } - LOG.info("Extracting ended on position: " + fileseeker.getPos()); - } - - @Override - public long getRowsRead() { - // TODO need to return the rows read - return 0; - } -} http://git-wip-us.apache.org/repos/asf/sqoop/blob/3d9aaa0d/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index fae6573..62f3a03 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -33,10 +33,9 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.sqoop.job.etl.HdfsExportExtractor; import org.apache.sqoop.job.etl.HdfsExportPartitioner; -import org.apache.sqoop.job.etl.HdfsSequenceExportExtractor; import org.apache.sqoop.job.etl.HdfsSequenceImportLoader; -import org.apache.sqoop.job.etl.HdfsTextExportExtractor; import org.apache.sqoop.job.etl.Loader; import org.apache.sqoop.job.etl.LoaderContext; import org.apache.sqoop.job.io.Data; @@ -66,7 +65,7 @@ public class TestHdfsExtract extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsTextExportExtractor.class.getName()); + HdfsExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(JobConstants.HADOOP_INPUTDIR, indir); @@ -84,7 +83,7 @@ public class TestHdfsExtract extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsTextExportExtractor.class.getName()); + HdfsExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(JobConstants.HADOOP_INPUTDIR, indir); @@ -97,7 +96,7 @@ public class TestHdfsExtract extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsTextExportExtractor.class.getName()); + HdfsExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(JobConstants.HADOOP_INPUTDIR, indir); @@ -115,7 +114,7 @@ public class TestHdfsExtract extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsSequenceExportExtractor.class.getName()); + HdfsExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(JobConstants.HADOOP_INPUTDIR, indir); @@ -133,7 +132,7 @@ public class TestHdfsExtract extends TestCase { conf.set(JobConstants.JOB_ETL_PARTITIONER, HdfsExportPartitioner.class.getName()); conf.set(JobConstants.JOB_ETL_EXTRACTOR, - HdfsSequenceExportExtractor.class.getName()); + HdfsExportExtractor.class.getName()); conf.set(JobConstants.JOB_ETL_LOADER, DummyLoader.class.getName()); conf.set(Constants.JOB_ETL_NUMBER_PARTITIONS, "4"); conf.set(JobConstants.HADOOP_INPUTDIR, indir);
