TextIO/CompressedSource: split AUTO mode files into bundles
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/370d5924 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/370d5924 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/370d5924 Branch: refs/heads/master Commit: 370d5924f393346115a22c23e5487f094847a783 Parents: 1ceb12a Author: Dan Halperin <dhalp...@google.com> Authored: Mon Sep 19 22:46:26 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Wed Sep 21 11:09:50 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/CompressedSource.java | 10 +-- .../java/org/apache/beam/sdk/io/TextIO.java | 44 +++++----- .../java/org/apache/beam/sdk/io/TextIOTest.java | 85 +++++++++++++++++++- 3 files changed, 108 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index 3cd097c..8a5fedd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -314,11 +314,11 @@ public class CompressedSource<T> extends FileBasedSource<T> { DecompressingChannelFactory channelFactory, String filePatternOrSpec, long minBundleSize, long startOffset, long endOffset) { super(filePatternOrSpec, minBundleSize, startOffset, endOffset); - checkArgument( - startOffset == 0, - "CompressedSources must start reading at offset 0. Requested offset: " + startOffset); this.sourceDelegate = sourceDelegate; this.channelFactory = channelFactory; + checkArgument( + isSplittable() || startOffset == 0, + "CompressedSources must start reading at offset 0. Requested offset: " + startOffset); } /** @@ -339,7 +339,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { @Override protected FileBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end) { return new CompressedSource<>(sourceDelegate.createForSubrangeOfFile(fileName, start, end), - channelFactory, fileName, Long.MAX_VALUE, start, end); + channelFactory, fileName, sourceDelegate.getMinBundleSize(), start, end); } /** @@ -348,7 +348,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { * from the requested file name that the file is not compressed. */ @Override - protected final boolean isSplittable() throws Exception { + protected final boolean isSplittable() { if (channelFactory instanceof FileNameBasedDecompressingChannelFactory) { FileNameBasedDecompressingChannelFactory fileNameBasedChannelFactory = (FileNameBasedDecompressingChannelFactory) channelFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index 79967d1..62d3ae8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.io; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; @@ -286,40 +287,35 @@ public class TextIO { } } - // Create a source specific to the requested compression type. - final Bounded<T> read; - switch(compressionType) { + final Bounded<T> read = org.apache.beam.sdk.io.Read.from(getSource()); + PCollection<T> pcol = input.getPipeline().apply("Read", read); + // Honor the default output coder that would have been used by this PTransform. + pcol.setCoder(getDefaultOutputCoder()); + return pcol; + } + + // Helper to create a source specific to the requested compression type. + protected FileBasedSource<T> getSource() { + switch (compressionType) { case UNCOMPRESSED: - read = org.apache.beam.sdk.io.Read.from( - new TextSource<T>(filepattern, coder)); - break; + return new TextSource<T>(filepattern, coder); case AUTO: - read = org.apache.beam.sdk.io.Read.from( - CompressedSource.from(new TextSource<T>(filepattern, coder))); - break; + return CompressedSource.from(new TextSource<T>(filepattern, coder)); case BZIP2: - read = org.apache.beam.sdk.io.Read.from( + return CompressedSource.from(new TextSource<T>(filepattern, coder)) - .withDecompression(CompressedSource.CompressionMode.BZIP2)); - break; + .withDecompression(CompressedSource.CompressionMode.BZIP2); case GZIP: - read = org.apache.beam.sdk.io.Read.from( + return CompressedSource.from(new TextSource<T>(filepattern, coder)) - .withDecompression(CompressedSource.CompressionMode.GZIP)); - break; + .withDecompression(CompressedSource.CompressionMode.GZIP); case ZIP: - read = org.apache.beam.sdk.io.Read.from( + return CompressedSource.from(new TextSource<T>(filepattern, coder)) - .withDecompression(CompressedSource.CompressionMode.ZIP)); - break; + .withDecompression(CompressedSource.CompressionMode.ZIP); default: - throw new IllegalArgumentException("Unknown compression mode: " + compressionType); + throw new IllegalArgumentException("Unknown compression type: " + compressionType); } - - PCollection<T> pcol = input.getPipeline().apply("Read", read); - // Honor the default output coder that would have been used by this PTransform. - pcol.setCoder(getDefaultOutputCoder()); - return pcol; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/370d5924/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index 859602a..49f5b16 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -24,8 +24,10 @@ import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -1118,10 +1120,89 @@ public class TextIOTest { private TextSource<String> prepareSource(byte[] data) throws IOException { File file = tmpFolder.newFile(); Files.write(file.toPath(), data); + return new TextSource<>(file.toPath().toString(), StringUtf8Coder.of()); + } + + @Test + public void testInitialSplitIntoBundlesAutoModeTxt() throws Exception { + String[] lines = makeLines(5000); + File file = tmpFolder.newFile("to_be_split_auto.txt"); + writeToStreamAndClose(lines, new FileOutputStream(file)); + PipelineOptions options = TestPipeline.testingPipelineOptions(); + long desiredBundleSize = 1000; + + // Sanity check: file is at least 2 bundles long. + assertThat(file.length(), greaterThan(2 * desiredBundleSize)); + + FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource(); + List<? extends FileBasedSource<String>> splits = + source.splitIntoBundles(desiredBundleSize, options); + + // At least 2 splits and they are equal to reading the whole file. + assertThat(splits, hasSize(greaterThan(1))); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + } + + @Test + public void testInitialSplitIntoBundlesAutoModeGz() throws Exception { + String[] lines = makeLines(5000); + File file = tmpFolder.newFile("to_be_split_auto.gz"); + writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file))); + PipelineOptions options = TestPipeline.testingPipelineOptions(); + long desiredBundleSize = 1000; + + // Sanity check: file is at least 2 bundles long. + assertThat(file.length(), greaterThan(2 * desiredBundleSize)); - TextSource<String> source = new TextSource<>(file.toPath().toString(), StringUtf8Coder.of()); + FileBasedSource<String> source = TextIO.Read.from(file.getPath()).getSource(); + List<? extends FileBasedSource<String>> splits = + source.splitIntoBundles(desiredBundleSize, options); - return source; + // Exactly 1 split, even in AUTO mode, since it is a gzip file. + assertThat(splits, hasSize(equalTo(1))); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + } + + @Test + public void testInitialSplitIntoBundlesGzipModeTxt() throws Exception { + String[] lines = makeLines(5000); + File file = tmpFolder.newFile("to_be_split_gzip.txt"); + writeToStreamAndClose(lines, new FileOutputStream(file)); + PipelineOptions options = TestPipeline.testingPipelineOptions(); + long desiredBundleSize = 1000; + + // Sanity check: file is at least 2 bundles long. + assertThat(file.length(), greaterThan(2 * desiredBundleSize)); + + FileBasedSource<String> source = + TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource(); + List<? extends FileBasedSource<String>> splits = + source.splitIntoBundles(desiredBundleSize, options); + + // Exactly 1 split, even though .txt extension, since using GZIP mode. + assertThat(splits, hasSize(equalTo(1))); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); + } + + @Test + public void testInitialSplitIntoBundlesGzipModeGz() throws Exception { + String[] lines = makeLines(5000); + File file = tmpFolder.newFile("to_be_split_gzip.gz"); + writeToStreamAndClose(lines, new GZIPOutputStream(new FileOutputStream(file))); + PipelineOptions options = TestPipeline.testingPipelineOptions(); + long desiredBundleSize = 1000; + + // Sanity check: file is at least 2 bundles long. + assertThat(file.length(), greaterThan(2 * desiredBundleSize)); + + FileBasedSource<String> source = + TextIO.Read.from(file.getPath()).withCompressionType(CompressionType.GZIP).getSource(); + List<? extends FileBasedSource<String>> splits = + source.splitIntoBundles(desiredBundleSize, options); + + // Exactly 1 split using .gz extension and using GZIP mode. + assertThat(splits, hasSize(equalTo(1))); + SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options); } ///////////////////////////////////////////////////////////////////////////////////////////////