Repository: beam Updated Branches: refs/heads/master 9a43da74a -> d4df90790
[BEAM-2290] Fix issue where timestamps weren't set when using CompressedSource Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/504bd6a8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/504bd6a8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/504bd6a8 Branch: refs/heads/master Commit: 504bd6a8036946a796a7290853e45bf20d32d07d Parents: 9a43da7 Author: Rune Fevang <[email protected]> Authored: Fri May 12 22:24:32 2017 +0200 Committer: Dan Halperin <[email protected]> Committed: Mon May 15 09:52:00 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/CompressedSource.java | 6 +++++ .../beam/sdk/io/CompressedSourceTest.java | 25 ++++++++++++++++++-- 2 files changed, 29 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/504bd6a8/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java index f2fc37b..6ab8dec 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorInputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream; +import org.joda.time.Instant; /** * A Source that reads from compressed files. A {@code CompressedSources} wraps a delegate @@ -579,5 +580,10 @@ public class CompressedSource<T> extends FileBasedSource<T> { return channel.getCount(); } } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return readerDelegate.getCurrentTimestamp(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/504bd6a8/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java index 014e16e..3fff319 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java @@ -64,12 +64,16 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; import org.hamcrest.Matchers; +import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -552,8 +556,13 @@ public class CompressedSourceTest { if (decompressionFactory != null) { source = source.withDecompression(decompressionFactory); } - PCollection<Byte> output = p.apply(Read.from(source)); - PAssert.that(output).containsInAnyOrder(Bytes.asList(expected)); + PCollection<KV<Long, Byte>> output = p.apply(Read.from(source)) + .apply(ParDo.of(new ExtractIndexFromTimestamp())); + ArrayList<KV<Long, Byte>> expectedOutput = new ArrayList<>(); + for (int i = 0; i < expected.length; i++) { + expectedOutput.add(KV.of((long) i, expected[i])); + } + PAssert.that(output).containsInAnyOrder(expectedOutput); p.run(); } @@ -632,6 +641,18 @@ public class CompressedSourceTest { protected long getCurrentOffset() { return offset; } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return new Instant(getCurrentOffset()); + } + } + } + + private static class ExtractIndexFromTimestamp extends DoFn<Byte, KV<Long, Byte>> { + @ProcessElement + public void processElement(ProcessContext context) { + context.output(KV.of(context.timestamp().getMillis(), context.element())); } }
