Repository: incubator-beam Updated Branches: refs/heads/master 6511ba28e -> 5bdea1e2b
Make IdentityWindowFn and NeverTrigger available This will be used as part of the new PAssert IdentityWindowFn remains package-private to restrict usage. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9a1efae Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9a1efae Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9a1efae Branch: refs/heads/master Commit: c9a1efae25455af1e47675938e296c716ec0fa0f Parents: 6511ba2 Author: Thomas Groh <tg...@google.com> Authored: Thu Mar 31 14:34:06 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Thu Apr 14 15:09:47 2016 -0700 ---------------------------------------------------------------------- .../transforms/windowing/AfterWatermark.java | 46 +------- .../beam/sdk/transforms/windowing/Never.java | 76 ++++++++++++ .../apache/beam/sdk/util/IdentityWindowFn.java | 116 +++++++++++++++++++ .../org/apache/beam/sdk/util/Reshuffle.java | 80 ++----------- .../sdk/transforms/windowing/NeverTest.java | 56 +++++++++ 5 files changed, 259 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 5aca093..05c6eb8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ExecutableTrigger; import org.apache.beam.sdk.util.TimeDomain; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.joda.time.Instant; @@ -96,43 +95,6 @@ public class AfterWatermark { TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger); } - /** - * A trigger which never fires. Used for the "early" trigger when only a late trigger was - * specified. - */ - private static class NeverTrigger extends OnceTrigger { - - protected NeverTrigger() { - super(null); - } - - @Override - public void onElement(OnElementContext c) throws Exception { } - - @Override - public void onMerge(OnMergeContext c) throws Exception { } - - @Override - protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { - return this; - } - - @Override - public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public boolean shouldFire(Trigger.TriggerContext context) throws Exception { - return false; - } - - @Override - protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception { - throw new UnsupportedOperationException( - String.format("%s should never fire", getClass().getSimpleName())); - } - } private static class AfterWatermarkEarlyAndLate extends Trigger @@ -314,8 +276,7 @@ public class AfterWatermark { * the given {@code Trigger} fires before the watermark has passed the end of the window. */ public AfterWatermarkEarly withEarlyFirings(OnceTrigger earlyFirings) { - Preconditions.checkNotNull(earlyFirings, - "Must specify the trigger to use for early firings"); + checkNotNull(earlyFirings, "Must specify the trigger to use for early firings"); return new AfterWatermarkEarlyAndLate(earlyFirings, null); } @@ -324,9 +285,8 @@ public class AfterWatermark { * the given {@code Trigger} fires after the watermark has passed the end of the window. */ public AfterWatermarkLate withLateFirings(OnceTrigger lateFirings) { - Preconditions.checkNotNull(lateFirings, - "Must specify the trigger to use for late firings"); - return new AfterWatermarkEarlyAndLate(new NeverTrigger(), lateFirings); + checkNotNull(lateFirings, "Must specify the trigger to use for late firings"); + return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java new file mode 100644 index 0000000..809e841 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.joda.time.Instant; + +import java.util.List; + +/** + * A trigger which never fires. + * + * <p> + * Using this trigger will only produce output when the watermark passes the end of the + * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness() allowed + * lateness}. + */ +public final class Never { + /** + * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} + * when the {@link BoundedWindow} closes. + */ + public static OnceTrigger ever() { + // NeverTrigger ignores all inputs and is Window-type independent. + return new NeverTrigger(); + } + + private static class NeverTrigger extends OnceTrigger { + protected NeverTrigger() { + super(null); + } + + @Override + public void onElement(OnElementContext c) {} + + @Override + public void onMerge(OnMergeContext c) {} + + @Override + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return this; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return BoundedWindow.TIMESTAMP_MAX_VALUE; + } + + @Override + public boolean shouldFire(Trigger.TriggerContext context) { + return false; + } + + @Override + protected void onOnlyFiring(Trigger.TriggerContext context) { + throw new UnsupportedOperationException( + String.format("%s should never fire", getClass().getSimpleName())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java new file mode 100644 index 0000000..91e5609 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.InvalidWindows; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.values.PCollection; + +import org.joda.time.Instant; + +import java.util.Collection; + +/** + * A {@link WindowFn} that leaves all associations between elements and windows unchanged. + * + * <p>This {@link WindowFn} is applied when elements must be passed through a {@link GroupByKey}, + * but should maintain their existing {@link Window} assignments. Because windows may have been + * merged, the earlier {@link WindowFn} may not appropriately maintain the existing window + * assignments. For example, if the earlier {@link WindowFn} merges windows, after a + * {@link GroupByKey} the {@link WindowingStrategy} uses {@link InvalidWindows}, and no further + * {@link GroupByKey} can be applied without applying a new {@link WindowFn}. This {@link WindowFn} + * allows existing window assignments to be maintained across a single group by key, at which point + * the earlier {@link WindowingStrategy} should be restored. + * + * <p>This {@link WindowFn} is an internal implementation detail of sdk-provided utilities, and + * should not be used by {@link Pipeline} writers. + */ +class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { + + /** + * The coder of the type of windows of the input {@link PCollection}. This is not an arbitrary + * {@link BoundedWindow} {@link Coder}, but is safe to use for all windows assigned by this + * transform, as it should be the same coder used by the {@link WindowFn} that initially assigned + * these windows. + */ + private final Coder<BoundedWindow> coder; + private final boolean assignsToSingleWindow; + + public IdentityWindowFn(Coder<? extends BoundedWindow> coder, boolean assignsToSingleWindow) { + // Safe because it is only used privately here. + // At every point where a window is returned or accepted, it has been provided + // by priorWindowFn, so it is of the expected type. + @SuppressWarnings("unchecked") + Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) coder; + this.coder = windowCoder; + this.assignsToSingleWindow = assignsToSingleWindow; + } + + @Override + public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext c) + throws Exception { + // The windows are provided by priorWindowFn, which also provides the coder for them + @SuppressWarnings("unchecked") + Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows(); + return priorWindows; + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + throw new UnsupportedOperationException( + String.format( + "%s.isCompatible() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Override + public Coder<BoundedWindow> windowCoder() { + // Safe because the previous WindowFn provides both the windows and the coder. + // The Coder is _not_ actually a coder for an arbitrary BoundedWindow. + return coder; + } + + @Override + public boolean assignsToSingleWindow() { + return assignsToSingleWindow; + } + + @Override + public BoundedWindow getSideInputWindow(BoundedWindow window) { + throw new UnsupportedOperationException( + String.format( + "%s.getSideInputWindow() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Deprecated + @Override + public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) { + return inputTimestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 09b2222..5c91326 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -17,22 +17,15 @@ */ package org.apache.beam.sdk.util; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; - import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.util.Collection; /** * A {@link PTransform} that returns a {@link PCollection} equivalent to its input but operationally @@ -62,11 +55,14 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti // If the input has already had its windows merged, then the GBK that performed the merge // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged. - Window.Bound<KV<K, V>> rewindow = Window - .<KV<K, V>>into(new PassThroughWindowFn<>(originalStrategy.getWindowFn())) - .triggering(new ReshuffleTrigger<>()) - .discardingFiredPanes() - .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + Window.Bound<KV<K, V>> rewindow = + Window.<KV<K, V>>into( + new IdentityWindowFn<>( + originalStrategy.getWindowFn().windowCoder(), + originalStrategy.getWindowFn().assignsToSingleWindow())) + .triggering(new ReshuffleTrigger<>()) + .discardingFiredPanes() + .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); return input.apply(rewindow) .apply(GroupByKey.<K, V>create()) @@ -84,64 +80,4 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti } })); } - - /** - * A {@link WindowFn} that leaves all associations between elements and windows unchanged. - * - * <p>In order to implement all the abstract methods of {@link WindowFn}, this requires the - * prior {@link WindowFn}, to which all auxiliary functionality is delegated. - */ - private static class PassThroughWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { - - /** The WindowFn prior to this. Used for its windowCoder, etc. */ - private final WindowFn<?, BoundedWindow> priorWindowFn; - - public PassThroughWindowFn(WindowFn<?, ?> priorWindowFn) { - // Safe because it is only used privately here. - // At every point where a window is returned or accepted, it has been provided - // by priorWindowFn, so it is of the type expected. - @SuppressWarnings("unchecked") - WindowFn<?, BoundedWindow> internalWindowFn = (WindowFn<?, BoundedWindow>) priorWindowFn; - this.priorWindowFn = internalWindowFn; - } - - @Override - public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext c) - throws Exception { - // The windows are provided by priorWindowFn, which also provides the coder for them - @SuppressWarnings("unchecked") - Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows(); - return priorWindows; - } - - @Override - public boolean isCompatible(WindowFn<?, ?> other) { - throw new UnsupportedOperationException( - String.format("%s.isCompatible() should never be called." - + " It is a private implementation detail of Reshuffle." - + " This message indicates a bug in the Dataflow SDK.", - getClass().getCanonicalName())); - } - - @Override - public Coder<BoundedWindow> windowCoder() { - // Safe because priorWindowFn provides the windows also. - // The Coder is _not_ actually a coder for an arbitrary BoundedWindow. - return priorWindowFn.windowCoder(); - } - - @Override - public BoundedWindow getSideInputWindow(BoundedWindow window) { - throw new UnsupportedOperationException( - String.format("%s.getSideInputWindow() should never be called." - + " It is a private implementation detail of Reshuffle." - + " This message indicates a bug in the Dataflow SDK.", - getClass().getCanonicalName())); - } - - @Override - public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) { - return inputTimestamp; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java new file mode 100644 index 0000000..222fe4e --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.util.TriggerTester; +import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Never}. + */ +@RunWith(JUnit4.class) +public class NeverTest { + private SimpleTriggerTester<IntervalWindow> triggerTester; + + @Before + public void setup() throws Exception { + triggerTester = + TriggerTester.forTrigger( + Never.<IntervalWindow>ever(), FixedWindows.of(Duration.standardMinutes(5))); + } + + @Test + public void falseAfterEndOfWindow() throws Exception { + triggerTester.injectElements(TimestampedValue.of(1, new Instant(1))); + IntervalWindow window = + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5))); + assertThat(triggerTester.shouldFire(window), is(false)); + triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(triggerTester.shouldFire(window), is(false)); + } +}