Repository: beam Updated Branches: refs/heads/master 53c9bf4cd -> 9cdae6caf
Allow the Distinct transform to deduplicate elements across panes Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1bc84fb5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1bc84fb5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1bc84fb5 Branch: refs/heads/master Commit: 1bc84fb5ff4ca087c97da45247f1e445eadc48de Parents: 53c9bf4 Author: Reuven Lax <[email protected]> Authored: Tue May 16 12:12:01 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Fri Jun 2 15:42:53 2017 -0700 ---------------------------------------------------------------------- .../runners/spark/SparkRunnerDebuggerTest.java | 2 +- .../apache/beam/sdk/transforms/Distinct.java | 80 +++++++++--- .../beam/sdk/transforms/DistinctTest.java | 130 ++++++++++++++++++- 3 files changed, 188 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index 9009751..64ff98c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -142,7 +142,7 @@ public class SparkRunnerDebuggerTest { + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n" + "_.groupByKey()\n" + "_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n" - + "_.mapPartitions(new org.apache.beam.sdk.transforms.Keys$1())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$3())\n" + "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n" + "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>"; http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java index 2d08cee..d751dbe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Distinct.java @@ -17,9 +17,15 @@ */ package org.apache.beam.sdk.transforms; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * {@code Distinct<T>} takes a {@code PCollection<T>} and @@ -59,6 +65,8 @@ import org.apache.beam.sdk.values.TypeDescriptor; */ public class Distinct<T> extends PTransform<PCollection<T>, PCollection<T>> { + private static final Logger LOG = LoggerFactory.getLogger(Distinct.class); + /** * Returns a {@code Distinct<T>} {@code PTransform}. * @@ -66,7 +74,7 @@ public class Distinct<T> extends PTransform<PCollection<T>, * {@code PCollection}s */ public static <T> Distinct<T> create() { - return new Distinct<T>(); + return new Distinct<>(); } /** @@ -78,26 +86,48 @@ public class Distinct<T> extends PTransform<PCollection<T>, */ public static <T, IdT> WithRepresentativeValues<T, IdT> withRepresentativeValueFn( SerializableFunction<T, IdT> fn) { - return new WithRepresentativeValues<T, IdT>(fn, null); + return new WithRepresentativeValues<>(fn, null); + } + + private static <T, W extends BoundedWindow> void validateWindowStrategy( + WindowingStrategy<T, W> strategy) { + if (!strategy.getWindowFn().isNonMerging() + && (!strategy.getTrigger().getClass().equals(DefaultTrigger.class) + || strategy.getAllowedLateness().isLongerThan(Duration.ZERO))) { + throw new UnsupportedOperationException(String.format( + "%s does not support non-merging windowing strategies, except when using the default " + + "trigger and zero allowed lateness.", Distinct.class.getSimpleName())); + } } @Override public PCollection<T> expand(PCollection<T> in) { - return in - .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() { - @Override - public KV<T, Void> apply(T element) { - return KV.of(element, (Void) null); - } - })) - .apply(Combine.<T, Void>perKey( - new SerializableFunction<Iterable<Void>, Void>() { + validateWindowStrategy(in.getWindowingStrategy()); + PCollection<KV<T, Void>> combined = + in.apply("KeyByElement", MapElements.via( + new SimpleFunction<T, KV<T, Void>>() { @Override - public Void apply(Iterable<Void> iter) { - return null; // ignore input - } + public KV<T, Void> apply(T element) { + return KV.of(element, (Void) null); + } })) - .apply(Keys.<T>create()); + .apply("DropValues", + Combine.<T, Void>perKey( + new SerializableFunction<Iterable<Void>, Void>() { + @Override + public Void apply(Iterable<Void> iter) { + return null; // ignore input + } + })); + return combined.apply("ExtractFirstKey", ParDo.of(new DoFn<KV<T, Void>, T>() { + @ProcessElement + public void processElement(ProcessContext c) { + if (c.pane().isFirst()) { + // Only output the key if it's the first time it's been seen. + c.output(c.element().getKey()); + } + } + })); } /** @@ -120,22 +150,32 @@ public class Distinct<T> extends PTransform<PCollection<T>, this.representativeType = representativeType; } + @Override public PCollection<T> expand(PCollection<T> in) { + validateWindowStrategy(in.getWindowingStrategy()); WithKeys<IdT, T> withKeys = WithKeys.of(fn); if (representativeType != null) { withKeys = withKeys.withKeyType(representativeType); } - return in - .apply(withKeys) - .apply(Combine.<IdT, T, T>perKey( + PCollection<KV<IdT, T>> combined = in + .apply("KeyByRepresentativeValue", withKeys) + .apply("OneValuePerKey", Combine.<IdT, T, T>perKey( new Combine.BinaryCombineFn<T>() { @Override public T apply(T left, T right) { return left; } - })) - .apply(Values.<T>create()); + })); + return combined.apply("KeepFirstPane", ParDo.of(new DoFn<KV<IdT, T>, T>() { + @ProcessElement + public void processElement(ProcessContext c) { + // Only output the value if it's the first time it's been seen. + if (c.pane().isFirst()) { + c.output(c.element().getValue()); + } + } + })); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/1bc84fb5/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java index 17bbed6..b9810c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DistinctTest.java @@ -24,12 +24,25 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Duration; +import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -85,9 +98,9 @@ public class DistinctTest { p.run(); } - private static class Keys implements SerializableFunction<KV<String, String>, String> { + private static class Keys<T> implements SerializableFunction<KV<T, String>, T> { @Override - public String apply(KV<String, String> input) { + public T apply(KV<T, String> input) { return input.getKey(); } } @@ -118,11 +131,122 @@ public class DistinctTest { PCollection<KV<String, String>> input = p.apply(Create.of(strings)); PCollection<KV<String, String>> output = - input.apply(Distinct.withRepresentativeValueFn(new Keys())); + input.apply(Distinct.withRepresentativeValueFn(new Keys<String>()) + .withRepresentativeType(TypeDescriptor.of(String.class))); PAssert.that(output).satisfies(new Checker()); p.run(); } + + @Rule + public TestPipeline windowedDistinctPipeline = TestPipeline.create(); + + @Test + @Category({ValidatesRunner.class, UsesTestStream.class}) + public void testWindowedDistinct() { + Instant base = new Instant(0); + TestStream<String> values = TestStream.create(StringUtf8Coder.of()) + .advanceWatermarkTo(base) + .addElements( + TimestampedValue.of("k1", base), + TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))), + TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20))), + TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))), + TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))), + TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50))), + TimestampedValue.of("k4", base.plus(Duration.standardSeconds(60))), + TimestampedValue.of("k5", base.plus(Duration.standardSeconds(70))), + TimestampedValue.of("k6", base.plus(Duration.standardSeconds(80)))) + .advanceWatermarkToInfinity(); + + PCollection<String> distinctValues = windowedDistinctPipeline + .apply(values) + .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(30)))) + .apply(Distinct.<String>create()); + PAssert.that(distinctValues) + .inWindow(new IntervalWindow(base, base.plus(Duration.standardSeconds(30)))) + .containsInAnyOrder("k1", "k2", "k3"); + PAssert.that(distinctValues) + .inWindow(new IntervalWindow(base.plus( + Duration.standardSeconds(30)), base.plus(Duration.standardSeconds(60)))) + .containsInAnyOrder("k1", "k2", "k3"); + PAssert.that(distinctValues) + .inWindow(new IntervalWindow(base.plus( + Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(90)))) + .containsInAnyOrder("k4", "k5", "k6"); + windowedDistinctPipeline.run(); + } + + @Rule + public TestPipeline triggeredDistinctPipeline = TestPipeline.create(); + + @Test + @Category({ValidatesRunner.class, UsesTestStream.class}) + public void testTriggeredDistinct() { + Instant base = new Instant(0); + TestStream<String> values = TestStream.create(StringUtf8Coder.of()) + .advanceWatermarkTo(base) + .addElements( + TimestampedValue.of("k1", base), + TimestampedValue.of("k2", base.plus(Duration.standardSeconds(10))), + TimestampedValue.of("k3", base.plus(Duration.standardSeconds(20)))) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + TimestampedValue.of("k1", base.plus(Duration.standardSeconds(30))), + TimestampedValue.of("k2", base.plus(Duration.standardSeconds(40))), + TimestampedValue.of("k3", base.plus(Duration.standardSeconds(50)))) + .advanceWatermarkToInfinity(); + + PCollection<String> distinctValues = triggeredDistinctPipeline + .apply(values) + .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering(Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + Duration.standardSeconds(30)))) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()) + .apply(Distinct.<String>create()); + PAssert.that(distinctValues).containsInAnyOrder("k1", "k2", "k3"); + triggeredDistinctPipeline.run(); + } + + @Rule + public TestPipeline triggeredDistinctRepresentativePipeline = TestPipeline.create(); + + @Test + @Category({ValidatesRunner.class, UsesTestStream.class}) + public void testTriggeredDistinctRepresentativeValues() { + Instant base = new Instant(0); + TestStream<KV<Integer, String>> values = TestStream.create( + KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())) + .advanceWatermarkTo(base) + .addElements( + TimestampedValue.of(KV.of(1, "k1"), base), + TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(10))), + TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(20)))) + .advanceProcessingTime(Duration.standardMinutes(1)) + .addElements( + TimestampedValue.of(KV.of(1, "k1"), base.plus(Duration.standardSeconds(30))), + TimestampedValue.of(KV.of(2, "k2"), base.plus(Duration.standardSeconds(40))), + TimestampedValue.of(KV.of(3, "k3"), base.plus(Duration.standardSeconds(50)))) + .advanceWatermarkToInfinity(); + + PCollection<KV<Integer, String>> distinctValues = triggeredDistinctRepresentativePipeline + .apply(values) + .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering(Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane().plusDelayOf( + Duration.standardSeconds(30)))) + .withAllowedLateness(Duration.ZERO) + .accumulatingFiredPanes()) + .apply(Distinct.withRepresentativeValueFn(new Keys<Integer>()) + .withRepresentativeType(TypeDescriptor.of(Integer.class))); + + + PAssert.that(distinctValues).containsInAnyOrder( + KV.of(1, "k1"), KV.of(2, "k2"), KV.of(3, "k3")); + triggeredDistinctRepresentativePipeline.run(); + } }
