SQOOP-761: HDFSTextExportExtractor loses lines around partition boundaries (Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/bd53c33f Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/bd53c33f Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/bd53c33f Branch: refs/heads/branch-1.99.1 Commit: bd53c33f792c9938edb789f39420403432d905bf Parents: 75559f5 Author: Jarek Jarcec Cecho <[email protected]> Authored: Sat Dec 15 12:43:05 2012 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Sat Dec 15 23:08:47 2012 -0800 ---------------------------------------------------------------------- .../sqoop/job/etl/HdfsSequenceExportExtractor.java | 3 + .../sqoop/job/etl/HdfsTextExportExtractor.java | 33 ++++++-------- .../java/org/apache/sqoop/job/TestHdfsExtract.java | 10 ++-- pom.xml | 16 +++++++ 4 files changed, 38 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd53c33f/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java index 2261a7c..16afcdb 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsSequenceExportExtractor.java @@ -85,6 +85,9 @@ public class HdfsSequenceExportExtractor extends Extractor { while (hasNext) { datawriter.writeCsvRecord(line.toString()); hasNext = filereader.next(line); + if(filereader.getPosition() >= end && filereader.syncSeen()) { + break; + } } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd53c33f/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java index fdc7d67..8055140 100644 --- a/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java +++ b/execution/mapreduce/src/main/java/org/apache/sqoop/job/etl/HdfsTextExportExtractor.java @@ -84,25 +84,16 @@ public class HdfsTextExportExtractor extends Extractor { FSDataInputStream filestream = fs.open(file); CompressionCodec codec = (new CompressionCodecFactory(conf)).getCodec(file); LineReader filereader; - Seekable fileseeker; + Seekable fileseeker = filestream; - if (codec == null) { - filestream.seek(start); - byte[] recordDelimiterBytes = String.valueOf( - Data.DEFAULT_RECORD_DELIMITER).getBytes( - Charset.forName(Data.CHARSET_NAME)); - // Hadoop 1.0 do not have support for custom record delimiter and thus we - // are supporting only default one. - filereader = new LineReader(filestream, conf); - fileseeker = filestream; + // Hadoop 1.0 does not have support for custom record delimiter and thus we + // are supporting only default one. // We might add another "else if" case for SplittableCompressionCodec once // we drop support for Hadoop 1.0. + if (codec == null) { + filestream.seek(start); + filereader = new LineReader(filestream); } else { - byte[] recordDelimiterBytes = String.valueOf( - Data.DEFAULT_RECORD_DELIMITER).getBytes( - Charset.forName(Data.CHARSET_NAME)); - // Hadoop 1.0 do not have support for custom record delimiter and thus we - // are supporting only default one. filereader = new LineReader( codec.createInputStream(filestream, codec.createDecompressor()), conf); fileseeker = filestream; @@ -113,15 +104,20 @@ public class HdfsTextExportExtractor extends Extractor { // one extra line is read in previous split start += filereader.readLine(new Text(), 0); } - Text line = new Text(); int size; - while (fileseeker.getPos() <= end) { + LOG.info("Start position: " + String.valueOf(start)); + long next = start; + while (next <= end) { size = filereader.readLine(line, Integer.MAX_VALUE); if (size == 0) { break; } - + if (codec == null) { + next += size; + } else { + next = fileseeker.getPos(); + } datawriter.writeCsvRecord(line.toString()); } LOG.info("Extracting ended on position: " + fileseeker.getPos()); @@ -132,5 +128,4 @@ public class HdfsTextExportExtractor extends Extractor { // TODO need to return the rows read return 0; } - } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd53c33f/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java ---------------------------------------------------------------------- diff --git a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java index 484eb20..95cfe85 100644 --- a/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java +++ b/execution/mapreduce/src/test/java/org/apache/sqoop/job/TestHdfsExtract.java @@ -107,7 +107,7 @@ public class TestHdfsExtract extends TestCase { } @Test - public void testUncompressedSequence() throws Exception { + public void testCompressedSequence() throws Exception { FileUtils.delete(indir); FileUtils.mkdirs(indir); createSequenceInput(SqoopFileOutputFormat.DEFAULT_CODEC); @@ -125,7 +125,7 @@ public class TestHdfsExtract extends TestCase { } @Test - public void testCompressedSequence() throws Exception { + public void testUncompressedSequence() throws Exception { FileUtils.delete(indir); FileUtils.mkdirs(indir); createSequenceInput(null); @@ -241,9 +241,9 @@ public class TestHdfsExtract extends TestCase { int numbers = NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE; // This test is not currently working due to bug in HdfsExtractor. // Check SQOOP-761 for more details. -// assertEquals((1+numbers)*numbers/2, sum); -// -// assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1); + assertEquals((1+numbers)*numbers/2, sum); + + assertEquals(NUMBER_OF_FILES*NUMBER_OF_ROWS_PER_FILE, index-1); } } http://git-wip-us.apache.org/repos/asf/sqoop/blob/bd53c33f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 4ab133f..13b0b10 100644 --- a/pom.xml +++ b/pom.xml @@ -353,8 +353,24 @@ limitations under the License. <artifactId>maven-jar-plugin</artifactId> <version>2.3.2</version> </plugin> + + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.12</version> + <configuration> + <forkMode>always</forkMode> + <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds> + <redirectTestOutputToFile>true</redirectTestOutputToFile> + <argLine>-Xms256m -Xmx1g</argLine> + </configuration> + </plugin> + </plugins> </pluginManagement> + + </build> <!-- All reports might be generated using mvn site command -->
