Repository: beam Updated Branches: refs/heads/master 3bcbba121 -> b82cd2446
http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java index 14f818a..268718a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java @@ -20,17 +20,20 @@ package org.apache.beam.sdk.util; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import java.io.Serializable; +import java.util.Collections; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Duration; +import org.joda.time.Instant; /** * A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values. @@ -55,22 +58,22 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows()); private final WindowFn<T, W> windowFn; + private final OutputTimeFn<? super W> outputTimeFn; private final Trigger trigger; private final AccumulationMode mode; private final Duration allowedLateness; private final ClosingBehavior closingBehavior; - private final TimestampCombiner timestampCombiner; private final boolean triggerSpecified; private final boolean modeSpecified; private final boolean allowedLatenessSpecified; - private final boolean timestampCombinerSpecified; + private final boolean outputTimeFnSpecified; private WindowingStrategy( WindowFn<T, W> windowFn, Trigger trigger, boolean triggerSpecified, AccumulationMode mode, boolean modeSpecified, Duration allowedLateness, boolean allowedLatenessSpecified, - TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified, + OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified, ClosingBehavior closingBehavior) { this.windowFn = windowFn; this.trigger = trigger; @@ -80,8 +83,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab this.allowedLateness = allowedLateness; this.allowedLatenessSpecified = allowedLatenessSpecified; this.closingBehavior = closingBehavior; - this.timestampCombiner = timestampCombiner; - this.timestampCombinerSpecified = timestampCombinerSpecified; + this.outputTimeFn = outputTimeFn; + this.outputTimeFnSpecified = outputTimeFnSpecified; } /** @@ -97,7 +100,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab DefaultTrigger.of(), false, AccumulationMode.DISCARDING_FIRED_PANES, false, DEFAULT_ALLOWED_LATENESS, false, - TimestampCombiner.END_OF_WINDOW, false, + new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), windowFn), false, ClosingBehavior.FIRE_IF_NON_EMPTY); } @@ -133,12 +136,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab return closingBehavior; } - public TimestampCombiner getTimestampCombiner() { - return timestampCombiner; + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; } - public boolean isTimestampCombinerSpecified() { - return timestampCombinerSpecified; + public boolean isOutputTimeFnSpecified() { + return outputTimeFnSpecified; } /** @@ -151,7 +154,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, true, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, + outputTimeFn, outputTimeFnSpecified, closingBehavior); } @@ -165,7 +168,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, triggerSpecified, mode, true, allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, + outputTimeFn, outputTimeFnSpecified, closingBehavior); } @@ -177,12 +180,17 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab @SuppressWarnings("unchecked") WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn; + // The onus of type correctness falls on the callee. + @SuppressWarnings("unchecked") + OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>) + new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn); + return new WindowingStrategy<T, W>( typedWindowFn, trigger, triggerSpecified, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, + newOutputTimeFn, outputTimeFnSpecified, closingBehavior); } @@ -196,7 +204,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, triggerSpecified, mode, modeSpecified, allowedLateness, true, - timestampCombiner, timestampCombinerSpecified, + outputTimeFn, outputTimeFnSpecified, closingBehavior); } @@ -206,19 +214,40 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, triggerSpecified, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - timestampCombiner, timestampCombinerSpecified, + outputTimeFn, outputTimeFnSpecified, closingBehavior); } @Experimental(Experimental.Kind.OUTPUT_TIME) - public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) { + public WindowingStrategy<T, W> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { + + @SuppressWarnings("unchecked") + OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn; + + OutputTimeFn<? super W> newOutputTimeFn = + new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn); return new WindowingStrategy<T, W>( windowFn, trigger, triggerSpecified, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - timestampCombiner, true, + newOutputTimeFn, true, + closingBehavior); + } + + /** + * Fixes all the defaults so that equals can be used to check that two strategies are the same, + * regardless of the state of "defaulted-ness". + */ + @VisibleForTesting + public WindowingStrategy<T, W> fixDefaults() { + return new WindowingStrategy<>( + windowFn, + trigger, true, + mode, true, + allowedLateness, true, + outputTimeFn, true, closingBehavior); } @@ -229,7 +258,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab .add("allowedLateness", allowedLateness) .add("trigger", trigger) .add("accumulationMode", mode) - .add("timestampCombiner", timestampCombiner) + .add("outputTimeFn", outputTimeFn) .toString(); } @@ -239,45 +268,104 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab return false; } WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object; - return isTriggerSpecified() == other.isTriggerSpecified() + return + isTriggerSpecified() == other.isTriggerSpecified() && isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified() && isModeSpecified() == other.isModeSpecified() - && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified() && getMode().equals(other.getMode()) && getAllowedLateness().equals(other.getAllowedLateness()) && getClosingBehavior().equals(other.getClosingBehavior()) && getTrigger().equals(other.getTrigger()) - && getTimestampCombiner().equals(other.getTimestampCombiner()) && getWindowFn().equals(other.getWindowFn()); } @Override public int hashCode() { - return Objects.hash( - triggerSpecified, - allowedLatenessSpecified, - modeSpecified, - timestampCombinerSpecified, - mode, - allowedLateness, - closingBehavior, - trigger, - timestampCombiner, - windowFn); + return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified, + windowFn, trigger, mode, allowedLateness, closingBehavior); } /** - * Fixes all the defaults so that equals can be used to check that two strategies are the same, - * regardless of the state of "defaulted-ness". + * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps + * but then combines and merges according to a given {@link OutputTimeFn}. + * + * <ul> + * <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby + * {@link org.apache.beam.sdk.transforms.windowing.SlidingWindows#getOutputTime} + * moves elements later in time to avoid holding up progress downstream.</li> + * <li>Then, when multiple elements are buffered for output, the output timestamp of the + * result is calculated using {@link OutputTimeFn#combine}.</li> + * <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge + * is calculated using {@link OutputTimeFn#merge}.</li> + * </ul> */ - @VisibleForTesting - public WindowingStrategy<T, W> fixDefaults() { - return new WindowingStrategy<>( - windowFn, - trigger, true, - mode, true, - allowedLateness, true, - timestampCombiner, true, - closingBehavior); + public static class CombineWindowFnOutputTimes<W extends BoundedWindow> + extends OutputTimeFn<W> { + + private final OutputTimeFn<? super W> outputTimeFn; + private final WindowFn<?, W> windowFn; + + public CombineWindowFnOutputTimes( + OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) { + this.outputTimeFn = outputTimeFn; + this.windowFn = windowFn; + } + + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; + } + + @Override + public Instant assignOutputTime(Instant inputTimestamp, W window) { + return outputTimeFn.merge( + window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window))); + } + + @Override + public Instant combine(Instant timestamp, Instant otherTimestamp) { + return outputTimeFn.combine(timestamp, otherTimestamp); + } + + @Override + public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) { + return outputTimeFn.merge(newWindow, timestamps); + } + + @Override + public final boolean dependsOnlyOnWindow() { + return outputTimeFn.dependsOnlyOnWindow(); + } + + @Override + public boolean dependsOnlyOnEarliestInputTimestamp() { + return outputTimeFn.dependsOnlyOnEarliestInputTimestamp(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof CombineWindowFnOutputTimes)) { + return false; + } + + CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj; + return outputTimeFn.equals(that.outputTimeFn) && windowFn.equals(that.windowFn); + } + + @Override + public int hashCode() { + return Objects.hash(outputTimeFn, windowFn); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("outputTimeFn", outputTimeFn) + .add("windowFn", windowFn) + .toString(); + } } } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java index f9ab115..64841fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -21,7 +21,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; /** * Visitor for binding a {@link StateSpec} and to the associated {@link State}. @@ -63,11 +63,11 @@ public interface StateBinder<K> { /** * Bind to a watermark {@link StateSpec}. * - * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added - * to the returned {@link WatermarkHoldState} are to be combined. + * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to + * the returned {@link WatermarkHoldState} are to be combined. */ - <W extends BoundedWindow> WatermarkHoldState bindWatermark( + <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( String id, - StateSpec<? super K, WatermarkHoldState> spec, - TimestampCombiner timestampCombiner); + StateSpec<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn); } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index 8fa5bb0..dc647da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; /** * Static utility methods for creating {@link StateSpec} instances. @@ -208,9 +208,9 @@ public class StateSpecs { /** Create a state spec for holding the watermark. */ public static <W extends BoundedWindow> - StateSpec<Object, WatermarkHoldState> watermarkStateInternal( - TimestampCombiner timestampCombiner) { - return new WatermarkStateSpecInternal<W>(timestampCombiner); + StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal( + OutputTimeFn<? super W> outputTimeFn) { + return new WatermarkStateSpecInternal<W>(outputTimeFn); } public static <K, InputT, AccumT, OutputT> @@ -656,26 +656,26 @@ public class StateSpecs { /** * A specification for a state cell tracking a combined watermark hold. * - * <p>Includes the {@link TimestampCombiner} according to which the output times + * <p>Includes the {@link OutputTimeFn} according to which the output times * are combined. */ private static class WatermarkStateSpecInternal<W extends BoundedWindow> - implements StateSpec<Object, WatermarkHoldState> { + implements StateSpec<Object, WatermarkHoldState<W>> { /** * When multiple output times are added to hold the watermark, this determines how they are * combined, and also the behavior when merging windows. Does not contribute to equality/hash * since we have at most one watermark hold spec per computation. */ - private final TimestampCombiner timestampCombiner; + private final OutputTimeFn<? super W> outputTimeFn; - private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) { - this.timestampCombiner = timestampCombiner; + private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) { + this.outputTimeFn = outputTimeFn; } @Override - public WatermarkHoldState bind(String id, StateBinder<?> visitor) { - return visitor.bindWatermark(id, this, timestampCombiner); + public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) { + return visitor.bindWatermark(id, this, outputTimeFn); } @Override @@ -701,4 +701,5 @@ public class StateSpecs { return Objects.hash(getClass()); } } + } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java index ae9b700..20fa05f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java @@ -19,24 +19,25 @@ package org.apache.beam.sdk.util.state; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.joda.time.Instant; /** - * A {@link State} accepting and aggregating output timestamps, which determines the time to which - * the output watermark must be held. + * A {@link State} accepting and aggregating output timestamps, which determines + * the time to which the output watermark must be held. * * <p><b><i>For internal use only. This API may change at any time.</i></b> */ @Experimental(Kind.STATE) -public interface WatermarkHoldState extends GroupingState<Instant, Instant> { +public interface WatermarkHoldState<W extends BoundedWindow> + extends GroupingState<Instant, Instant> { /** - * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time - * given an element timestamp, and to combine watermarks from windows which are about to be - * merged. + * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given + * an element timestamp, and to combine watermarks from windows which are about to be merged. */ - TimestampCombiner getTimestampCombiner(); + OutputTimeFn<? super W> getOutputTimeFn(); @Override - WatermarkHoldState readLater(); + WatermarkHoldState<W> readLater(); } http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java index 26dd9f9..153bd84 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java @@ -39,6 +39,7 @@ public class SdkCoreApiSurfaceTest { ImmutableSet.of( "org.apache.beam", "com.google.api.client", + "com.google.protobuf", "com.fasterxml.jackson.annotation", "com.fasterxml.jackson.core", "com.fasterxml.jackson.databind", http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 0556199..939261f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -51,8 +51,8 @@ import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.WindowingStrategy; @@ -318,14 +318,14 @@ public class GroupByKeyTest { */ @Test @Category(ValidatesRunner.class) - public void testTimestampCombinerEarliest() { + public void testOutputTimeFnEarliest() { p.apply( Create.timestamped( TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new AssertTimestamp(new Instant(0)))); @@ -339,13 +339,13 @@ public class GroupByKeyTest { */ @Test @Category(ValidatesRunner.class) - public void testTimestampCombinerLatest() { + public void testOutputTimeFnLatest() { p.apply( Create.timestamped( TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) - .withTimestampCombiner(TimestampCombiner.LATEST)) + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())) .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new AssertTimestamp(new Instant(10)))); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index 9a17bc7..4e61f4e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -41,7 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -241,7 +241,7 @@ public class CoGroupByKeyTest implements Serializable { Arrays.asList(0L, 2L, 4L, 6L, 8L)) .apply("WindowClicks", Window.<KV<Integer, String>>into( FixedWindows.of(new Duration(4))) - .withTimestampCombiner(TimestampCombiner.EARLIEST)); + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())); PCollection<KV<Integer, String>> purchasesTable = createInput("CreatePurchases", @@ -250,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable { Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)) .apply("WindowPurchases", Window.<KV<Integer, String>>into( FixedWindows.of(new Duration(4))) - .withTimestampCombiner(TimestampCombiner.EARLIEST)); + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())); PCollection<KV<Integer, CoGbkResult>> coGbkResults = KeyedPCollectionTuple.of(clicksTag, clicksTable) http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java new file mode 100644 index 0000000..78d7a2f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** Tests for {@link OutputTimeFns}. */ +@RunWith(Parameterized.class) +public class OutputTimeFnsTest { + + @Parameters(name = "{index}: {0}") + public static Iterable<OutputTimeFn<BoundedWindow>> data() { + return ImmutableList.of( + OutputTimeFns.outputAtEarliestInputTimestamp(), + OutputTimeFns.outputAtLatestInputTimestamp(), + OutputTimeFns.outputAtEndOfWindow()); + } + + @Parameter(0) + public OutputTimeFn<?> outputTimeFn; + + @Test + public void testToProtoAndBack() throws Exception { + OutputTimeFn<?> result = OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn)); + + assertThat(result, equalTo((OutputTimeFn) outputTimeFn)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java index 9d94928..b131688 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java @@ -118,7 +118,7 @@ public class SessionsTest { } /** - * Test to confirm that {@link Sessions} with the default {@link TimestampCombiner} holds up the + * Test to confirm that {@link Sessions} with the default {@link OutputTimeFn} holds up the * watermark potentially indefinitely. */ @Test @@ -126,7 +126,7 @@ public class SessionsTest { try { WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps( Sessions.withGapDuration(Duration.millis(10)), - TimestampCombiner.EARLIEST, + OutputTimeFns.outputAtEarliestInputTimestamp(), ImmutableList.of( (List<Long>) ImmutableList.of(1L, 3L), (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L))); @@ -148,7 +148,7 @@ public class SessionsTest { public void testValidOutputAtEndTimes() throws Exception { WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps( Sessions.withGapDuration(Duration.millis(10)), - TimestampCombiner.END_OF_WINDOW, + OutputTimeFns.outputAtEndOfWindow(), ImmutableList.of( (List<Long>) ImmutableList.of(1L, 3L), (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L))); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 534e230..e1ed66a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -366,7 +366,7 @@ public class WindowTest implements Serializable { */ @Test @Category(ValidatesRunner.class) - public void testTimestampCombinerDefault() { + public void testOutputTimeFnDefault() { pipeline.enableAbandonedNodeEnforcement(true); pipeline @@ -400,7 +400,7 @@ public class WindowTest implements Serializable { */ @Test @Category(ValidatesRunner.class) - public void testTimestampCombinerEndOfWindow() { + public void testOutputTimeFnEndOfWindow() { pipeline.enableAbandonedNodeEnforcement(true); pipeline.apply( @@ -408,7 +408,7 @@ public class WindowTest implements Serializable { TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() { @ProcessElement @@ -426,14 +426,14 @@ public class WindowTest implements Serializable { AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); Duration allowedLateness = Duration.standardMinutes(10); Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; - TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW; + OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); Window<?> window = Window .into(windowFn) .triggering(triggerBuilder) .accumulatingFiredPanes() .withAllowedLateness(allowedLateness, closingBehavior) - .withTimestampCombiner(timestampCombiner); + .withOutputTimeFn(outputTimeFn); DisplayData displayData = DisplayData.from(window); @@ -446,7 +446,7 @@ public class WindowTest implements Serializable { assertThat(displayData, hasDisplayItem("allowedLateness", allowedLateness)); assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); - assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString())); + assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass())); } @Test @@ -456,14 +456,14 @@ public class WindowTest implements Serializable { AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); Duration allowedLateness = Duration.standardMinutes(10); Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; - TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW; + OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); Window<?> window = Window .into(windowFn) .triggering(triggerBuilder) .accumulatingFiredPanes() .withAllowedLateness(allowedLateness, closingBehavior) - .withTimestampCombiner(timestampCombiner); + .withOutputTimeFn(outputTimeFn); DisplayData primitiveDisplayData = Iterables.getOnlyElement( @@ -478,8 +478,7 @@ public class WindowTest implements Serializable { assertThat(primitiveDisplayData, hasDisplayItem("allowedLateness", allowedLateness)); assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); - assertThat( - primitiveDisplayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString())); + assertThat(primitiveDisplayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass())); } @Test @@ -498,7 +497,7 @@ public class WindowTest implements Serializable { assertThat(displayData, not(hasDisplayItem("accumulationMode"))); assertThat(displayData, not(hasDisplayItem("allowedLateness"))); assertThat(displayData, not(hasDisplayItem("closingBehavior"))); - assertThat(displayData, not(hasDisplayItem("timestampCombiner"))); + assertThat(displayData, not(hasDisplayItem("outputTimeFn"))); } @Test @@ -507,7 +506,7 @@ public class WindowTest implements Serializable { assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf( "windowFn", "trigger", - "timestampCombiner", + "outputTimeFn", "allowedLateness", "closingBehavior"))))); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 30b0311..a3f5352 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -76,7 +76,7 @@ public class WindowingTest implements Serializable { public PCollection<String> expand(PCollection<String> in) { return in.apply("Window", Window.<String>into(windowFn) - .withTimestampCombiner(TimestampCombiner.EARLIEST)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) .apply(Count.<String>perElement()) .apply("FormatCounts", ParDo.of(new FormatCountsDoFn())) .setCoder(StringUtf8Coder.of()); http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java index 215b0f4..50edd83 100644 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java @@ -42,6 +42,7 @@ public class GcpCoreApiSurfaceTest { "com.google.api.services.cloudresourcemanager", "com.google.api.services.storage", "com.google.auth", + "com.google.protobuf", "com.fasterxml.jackson.annotation", "com.fasterxml.jackson.core", "com.fasterxml.jackson.databind",
