[BEAM-167] Fix custom source gzip input to read concatenated gzip files This applies patch from kirpic...@google.com from https://gist.github.com/jkff/d8d984a33a41ec607328cee8e418c174
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/abc397f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/abc397f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/abc397f9 Branch: refs/heads/master Commit: abc397f9b9851599eff7f3e1ec5b5343005a0a94 Parents: fd049b5 Author: kirpichov <kirpic...@google.com> Authored: Mon Apr 4 13:31:23 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Mon Apr 4 14:40:07 2016 -0700 ---------------------------------------------------------------------- .../cloud/dataflow/sdk/io/CompressedSource.java | 2 +- .../dataflow/sdk/io/CompressedSourceTest.java | 40 ++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/abc397f9/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java index 4e3e9ca..15e6e29 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java @@ -122,7 +122,7 @@ public class CompressedSource<T> extends FileBasedSource<T> { byte zero = 0x00; int header = Ints.fromBytes(zero, zero, headerBytes[1], headerBytes[0]); if (header == GZIPInputStream.GZIP_MAGIC) { - return Channels.newChannel(new GzipCompressorInputStream(stream)); + return Channels.newChannel(new GzipCompressorInputStream(stream, true)); } } return Channels.newChannel(stream); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/abc397f9/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index 2dcddb4..f63a128 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -47,16 +47,19 @@ import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.Random; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nullable; @@ -98,6 +101,43 @@ public class CompressedSourceTest { runReadTest(input, CompressionMode.GZIP); } + private static byte[] compressGzip(byte[] input) throws IOException { + ByteArrayOutputStream res = new ByteArrayOutputStream(); + try (GZIPOutputStream gzipStream = new GZIPOutputStream(res)) { + gzipStream.write(input); + } + return res.toByteArray(); + } + + private static byte[] concat(byte[] first, byte[] second) { + byte[] res = new byte[first.length + second.length]; + System.arraycopy(first, 0, res, 0, first.length); + System.arraycopy(second, 0, res, first.length, second.length); + return res; + } + + @Test + public void testReadConcatenatedGzip() throws IOException { + byte[] header = "a,b,c\n".getBytes(StandardCharsets.UTF_8); + byte[] body = "1,2,3\n4,5,6\n7,8,9\n".getBytes(StandardCharsets.UTF_8); + byte[] expected = concat(header, body); + byte[] totalGz = concat(compressGzip(header), compressGzip(body)); + File tmpFile = tmpFolder.newFile(); + try (FileOutputStream os = new FileOutputStream(tmpFile)) { + os.write(totalGz); + } + + Pipeline p = TestPipeline.create(); + + CompressedSource<Byte> source = + CompressedSource.from(new ByteSource(tmpFile.getAbsolutePath(), 1)) + .withDecompression(CompressionMode.GZIP); + PCollection<Byte> output = p.apply(Read.from(source)); + + DataflowAssert.that(output).containsInAnyOrder(Bytes.asList(expected)); + p.run(); + } + /** * Test reading empty input with bzip2. */