http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java index 268718a..14f818a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java @@ -20,20 +20,17 @@ package org.apache.beam.sdk.util; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import java.io.Serializable; -import java.util.Collections; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Duration; -import org.joda.time.Instant; /** * A {@code WindowingStrategy} describes the windowing behavior for a specific collection of values. @@ -58,22 +55,22 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = of(new GlobalWindows()); private final WindowFn<T, W> windowFn; - private final OutputTimeFn<? super W> outputTimeFn; private final Trigger trigger; private final AccumulationMode mode; private final Duration allowedLateness; private final ClosingBehavior closingBehavior; + private final TimestampCombiner timestampCombiner; private final boolean triggerSpecified; private final boolean modeSpecified; private final boolean allowedLatenessSpecified; - private final boolean outputTimeFnSpecified; + private final boolean timestampCombinerSpecified; private WindowingStrategy( WindowFn<T, W> windowFn, Trigger trigger, boolean triggerSpecified, AccumulationMode mode, boolean modeSpecified, Duration allowedLateness, boolean allowedLatenessSpecified, - OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified, + TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified, ClosingBehavior closingBehavior) { this.windowFn = windowFn; this.trigger = trigger; @@ -83,8 +80,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab this.allowedLateness = allowedLateness; this.allowedLatenessSpecified = allowedLatenessSpecified; this.closingBehavior = closingBehavior; - this.outputTimeFn = outputTimeFn; - this.outputTimeFnSpecified = outputTimeFnSpecified; + this.timestampCombiner = timestampCombiner; + this.timestampCombinerSpecified = timestampCombinerSpecified; } /** @@ -100,7 +97,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab DefaultTrigger.of(), false, AccumulationMode.DISCARDING_FIRED_PANES, false, DEFAULT_ALLOWED_LATENESS, false, - new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), windowFn), false, + TimestampCombiner.END_OF_WINDOW, false, ClosingBehavior.FIRE_IF_NON_EMPTY); } @@ -136,12 +133,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab return closingBehavior; } - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; + public TimestampCombiner getTimestampCombiner() { + return timestampCombiner; } - public boolean isOutputTimeFnSpecified() { - return outputTimeFnSpecified; + public boolean isTimestampCombinerSpecified() { + return timestampCombinerSpecified; } /** @@ -154,7 +151,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, true, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - outputTimeFn, outputTimeFnSpecified, + timestampCombiner, timestampCombinerSpecified, closingBehavior); } @@ -168,7 +165,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, triggerSpecified, mode, true, allowedLateness, allowedLatenessSpecified, - outputTimeFn, outputTimeFnSpecified, + timestampCombiner, timestampCombinerSpecified, closingBehavior); } @@ -180,17 +177,12 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab @SuppressWarnings("unchecked") WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn; - // The onus of type correctness falls on the callee. - @SuppressWarnings("unchecked") - OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>) - new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn); - return new WindowingStrategy<T, W>( typedWindowFn, trigger, triggerSpecified, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - newOutputTimeFn, outputTimeFnSpecified, + timestampCombiner, timestampCombinerSpecified, closingBehavior); } @@ -204,7 +196,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, triggerSpecified, mode, modeSpecified, allowedLateness, true, - outputTimeFn, outputTimeFnSpecified, + timestampCombiner, timestampCombinerSpecified, closingBehavior); } @@ -214,40 +206,19 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab trigger, triggerSpecified, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - outputTimeFn, outputTimeFnSpecified, + timestampCombiner, timestampCombinerSpecified, closingBehavior); } @Experimental(Experimental.Kind.OUTPUT_TIME) - public WindowingStrategy<T, W> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { - - @SuppressWarnings("unchecked") - OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn; - - OutputTimeFn<? super W> newOutputTimeFn = - new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn); + public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner timestampCombiner) { return new WindowingStrategy<T, W>( windowFn, trigger, triggerSpecified, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - newOutputTimeFn, true, - closingBehavior); - } - - /** - * Fixes all the defaults so that equals can be used to check that two strategies are the same, - * regardless of the state of "defaulted-ness". - */ - @VisibleForTesting - public WindowingStrategy<T, W> fixDefaults() { - return new WindowingStrategy<>( - windowFn, - trigger, true, - mode, true, - allowedLateness, true, - outputTimeFn, true, + timestampCombiner, true, closingBehavior); } @@ -258,7 +229,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab .add("allowedLateness", allowedLateness) .add("trigger", trigger) .add("accumulationMode", mode) - .add("outputTimeFn", outputTimeFn) + .add("timestampCombiner", timestampCombiner) .toString(); } @@ -268,104 +239,45 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab return false; } WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object; - return - isTriggerSpecified() == other.isTriggerSpecified() + return isTriggerSpecified() == other.isTriggerSpecified() && isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified() && isModeSpecified() == other.isModeSpecified() + && isTimestampCombinerSpecified() == other.isTimestampCombinerSpecified() && getMode().equals(other.getMode()) && getAllowedLateness().equals(other.getAllowedLateness()) && getClosingBehavior().equals(other.getClosingBehavior()) && getTrigger().equals(other.getTrigger()) + && getTimestampCombiner().equals(other.getTimestampCombiner()) && getWindowFn().equals(other.getWindowFn()); } @Override public int hashCode() { - return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified, - windowFn, trigger, mode, allowedLateness, closingBehavior); + return Objects.hash( + triggerSpecified, + allowedLatenessSpecified, + modeSpecified, + timestampCombinerSpecified, + mode, + allowedLateness, + closingBehavior, + trigger, + timestampCombiner, + windowFn); } /** - * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps - * but then combines and merges according to a given {@link OutputTimeFn}. - * - * <ul> - * <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby - * {@link org.apache.beam.sdk.transforms.windowing.SlidingWindows#getOutputTime} - * moves elements later in time to avoid holding up progress downstream.</li> - * <li>Then, when multiple elements are buffered for output, the output timestamp of the - * result is calculated using {@link OutputTimeFn#combine}.</li> - * <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge - * is calculated using {@link OutputTimeFn#merge}.</li> - * </ul> + * Fixes all the defaults so that equals can be used to check that two strategies are the same, + * regardless of the state of "defaulted-ness". */ - public static class CombineWindowFnOutputTimes<W extends BoundedWindow> - extends OutputTimeFn<W> { - - private final OutputTimeFn<? super W> outputTimeFn; - private final WindowFn<?, W> windowFn; - - public CombineWindowFnOutputTimes( - OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) { - this.outputTimeFn = outputTimeFn; - this.windowFn = windowFn; - } - - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; - } - - @Override - public Instant assignOutputTime(Instant inputTimestamp, W window) { - return outputTimeFn.merge( - window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window))); - } - - @Override - public Instant combine(Instant timestamp, Instant otherTimestamp) { - return outputTimeFn.combine(timestamp, otherTimestamp); - } - - @Override - public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) { - return outputTimeFn.merge(newWindow, timestamps); - } - - @Override - public final boolean dependsOnlyOnWindow() { - return outputTimeFn.dependsOnlyOnWindow(); - } - - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return outputTimeFn.dependsOnlyOnEarliestInputTimestamp(); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof CombineWindowFnOutputTimes)) { - return false; - } - - CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj; - return outputTimeFn.equals(that.outputTimeFn) && windowFn.equals(that.windowFn); - } - - @Override - public int hashCode() { - return Objects.hash(outputTimeFn, windowFn); - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("outputTimeFn", outputTimeFn) - .add("windowFn", windowFn) - .toString(); - } + @VisibleForTesting + public WindowingStrategy<T, W> fixDefaults() { + return new WindowingStrategy<>( + windowFn, + trigger, true, + mode, true, + allowedLateness, true, + timestampCombiner, true, + closingBehavior); } }
http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java index 64841fb..f9ab115 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -21,7 +21,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; /** * Visitor for binding a {@link StateSpec} and to the associated {@link State}. @@ -63,11 +63,11 @@ public interface StateBinder<K> { /** * Bind to a watermark {@link StateSpec}. * - * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to - * the returned {@link WatermarkHoldState} are to be combined. + * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added + * to the returned {@link WatermarkHoldState} are to be combined. */ - <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + <W extends BoundedWindow> WatermarkHoldState bindWatermark( String id, - StateSpec<? super K, WatermarkHoldState<W>> spec, - OutputTimeFn<? super W> outputTimeFn); + StateSpec<? super K, WatermarkHoldState> spec, + TimestampCombiner timestampCombiner); } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index dc647da..8fa5bb0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; /** * Static utility methods for creating {@link StateSpec} instances. @@ -208,9 +208,9 @@ public class StateSpecs { /** Create a state spec for holding the watermark. */ public static <W extends BoundedWindow> - StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal( - OutputTimeFn<? super W> outputTimeFn) { - return new WatermarkStateSpecInternal<W>(outputTimeFn); + StateSpec<Object, WatermarkHoldState> watermarkStateInternal( + TimestampCombiner timestampCombiner) { + return new WatermarkStateSpecInternal<W>(timestampCombiner); } public static <K, InputT, AccumT, OutputT> @@ -656,26 +656,26 @@ public class StateSpecs { /** * A specification for a state cell tracking a combined watermark hold. * - * <p>Includes the {@link OutputTimeFn} according to which the output times + * <p>Includes the {@link TimestampCombiner} according to which the output times * are combined. */ private static class WatermarkStateSpecInternal<W extends BoundedWindow> - implements StateSpec<Object, WatermarkHoldState<W>> { + implements StateSpec<Object, WatermarkHoldState> { /** * When multiple output times are added to hold the watermark, this determines how they are * combined, and also the behavior when merging windows. Does not contribute to equality/hash * since we have at most one watermark hold spec per computation. */ - private final OutputTimeFn<? super W> outputTimeFn; + private final TimestampCombiner timestampCombiner; - private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) { - this.outputTimeFn = outputTimeFn; + private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) { + this.timestampCombiner = timestampCombiner; } @Override - public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) { - return visitor.bindWatermark(id, this, outputTimeFn); + public WatermarkHoldState bind(String id, StateBinder<?> visitor) { + return visitor.bindWatermark(id, this, timestampCombiner); } @Override @@ -701,5 +701,4 @@ public class StateSpecs { return Objects.hash(getClass()); } } - } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java index 20fa05f..ae9b700 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java @@ -19,25 +19,24 @@ package org.apache.beam.sdk.util.state; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.joda.time.Instant; /** - * A {@link State} accepting and aggregating output timestamps, which determines - * the time to which the output watermark must be held. + * A {@link State} accepting and aggregating output timestamps, which determines the time to which + * the output watermark must be held. * * <p><b><i>For internal use only. This API may change at any time.</i></b> */ @Experimental(Kind.STATE) -public interface WatermarkHoldState<W extends BoundedWindow> - extends GroupingState<Instant, Instant> { +public interface WatermarkHoldState extends GroupingState<Instant, Instant> { /** - * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given - * an element timestamp, and to combine watermarks from windows which are about to be merged. + * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time + * given an element timestamp, and to combine watermarks from windows which are about to be + * merged. */ - OutputTimeFn<? super W> getOutputTimeFn(); + TimestampCombiner getTimestampCombiner(); @Override - WatermarkHoldState<W> readLater(); + WatermarkHoldState readLater(); } http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java index 153bd84..26dd9f9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java @@ -39,7 +39,6 @@ public class SdkCoreApiSurfaceTest { ImmutableSet.of( "org.apache.beam", "com.google.api.client", - "com.google.protobuf", "com.fasterxml.jackson.annotation", "com.fasterxml.jackson.core", "com.fasterxml.jackson.databind", http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java index 939261f..0556199 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java @@ -51,8 +51,8 @@ import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.WindowingStrategy; @@ -318,14 +318,14 @@ public class GroupByKeyTest { */ @Test @Category(ValidatesRunner.class) - public void testOutputTimeFnEarliest() { + public void testTimestampCombinerEarliest() { p.apply( Create.timestamped( TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) + .withTimestampCombiner(TimestampCombiner.EARLIEST)) .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new AssertTimestamp(new Instant(0)))); @@ -339,13 +339,13 @@ public class GroupByKeyTest { */ @Test @Category(ValidatesRunner.class) - public void testOutputTimeFnLatest() { + public void testTimestampCombinerLatest() { p.apply( Create.timestamped( TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())) + .withTimestampCombiner(TimestampCombiner.LATEST)) .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new AssertTimestamp(new Instant(10)))); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index 4e61f4e..9a17bc7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -41,7 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -241,7 +241,7 @@ public class CoGroupByKeyTest implements Serializable { Arrays.asList(0L, 2L, 4L, 6L, 8L)) .apply("WindowClicks", Window.<KV<Integer, String>>into( FixedWindows.of(new Duration(4))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())); + .withTimestampCombiner(TimestampCombiner.EARLIEST)); PCollection<KV<Integer, String>> purchasesTable = createInput("CreatePurchases", @@ -250,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable { Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)) .apply("WindowPurchases", Window.<KV<Integer, String>>into( FixedWindows.of(new Duration(4))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())); + .withTimestampCombiner(TimestampCombiner.EARLIEST)); PCollection<KV<Integer, CoGbkResult>> coGbkResults = KeyedPCollectionTuple.of(clicksTag, clicksTable) http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java deleted file mode 100644 index 78d7a2f..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms.windowing; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableList; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** Tests for {@link OutputTimeFns}. */ -@RunWith(Parameterized.class) -public class OutputTimeFnsTest { - - @Parameters(name = "{index}: {0}") - public static Iterable<OutputTimeFn<BoundedWindow>> data() { - return ImmutableList.of( - OutputTimeFns.outputAtEarliestInputTimestamp(), - OutputTimeFns.outputAtLatestInputTimestamp(), - OutputTimeFns.outputAtEndOfWindow()); - } - - @Parameter(0) - public OutputTimeFn<?> outputTimeFn; - - @Test - public void testToProtoAndBack() throws Exception { - OutputTimeFn<?> result = OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn)); - - assertThat(result, equalTo((OutputTimeFn) outputTimeFn)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java index b131688..9d94928 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java @@ -118,7 +118,7 @@ public class SessionsTest { } /** - * Test to confirm that {@link Sessions} with the default {@link OutputTimeFn} holds up the + * Test to confirm that {@link Sessions} with the default {@link TimestampCombiner} holds up the * watermark potentially indefinitely. */ @Test @@ -126,7 +126,7 @@ public class SessionsTest { try { WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps( Sessions.withGapDuration(Duration.millis(10)), - OutputTimeFns.outputAtEarliestInputTimestamp(), + TimestampCombiner.EARLIEST, ImmutableList.of( (List<Long>) ImmutableList.of(1L, 3L), (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L))); @@ -148,7 +148,7 @@ public class SessionsTest { public void testValidOutputAtEndTimes() throws Exception { WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps( Sessions.withGapDuration(Duration.millis(10)), - OutputTimeFns.outputAtEndOfWindow(), + TimestampCombiner.END_OF_WINDOW, ImmutableList.of( (List<Long>) ImmutableList.of(1L, 3L), (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L))); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index e1ed66a..534e230 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -366,7 +366,7 @@ public class WindowTest implements Serializable { */ @Test @Category(ValidatesRunner.class) - public void testOutputTimeFnDefault() { + public void testTimestampCombinerDefault() { pipeline.enableAbandonedNodeEnforcement(true); pipeline @@ -400,7 +400,7 @@ public class WindowTest implements Serializable { */ @Test @Category(ValidatesRunner.class) - public void testOutputTimeFnEndOfWindow() { + public void testTimestampCombinerEndOfWindow() { pipeline.enableAbandonedNodeEnforcement(true); pipeline.apply( @@ -408,7 +408,7 @@ public class WindowTest implements Serializable { TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) + .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) .apply(GroupByKey.<Integer, String>create()) .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() { @ProcessElement @@ -426,14 +426,14 @@ public class WindowTest implements Serializable { AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); Duration allowedLateness = Duration.standardMinutes(10); Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; - OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); + TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW; Window<?> window = Window .into(windowFn) .triggering(triggerBuilder) .accumulatingFiredPanes() .withAllowedLateness(allowedLateness, closingBehavior) - .withOutputTimeFn(outputTimeFn); + .withTimestampCombiner(timestampCombiner); DisplayData displayData = DisplayData.from(window); @@ -446,7 +446,7 @@ public class WindowTest implements Serializable { assertThat(displayData, hasDisplayItem("allowedLateness", allowedLateness)); assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); - assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass())); + assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString())); } @Test @@ -456,14 +456,14 @@ public class WindowTest implements Serializable { AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); Duration allowedLateness = Duration.standardMinutes(10); Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; - OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); + TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW; Window<?> window = Window .into(windowFn) .triggering(triggerBuilder) .accumulatingFiredPanes() .withAllowedLateness(allowedLateness, closingBehavior) - .withOutputTimeFn(outputTimeFn); + .withTimestampCombiner(timestampCombiner); DisplayData primitiveDisplayData = Iterables.getOnlyElement( @@ -478,7 +478,8 @@ public class WindowTest implements Serializable { assertThat(primitiveDisplayData, hasDisplayItem("allowedLateness", allowedLateness)); assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); - assertThat(primitiveDisplayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass())); + assertThat( + primitiveDisplayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString())); } @Test @@ -497,7 +498,7 @@ public class WindowTest implements Serializable { assertThat(displayData, not(hasDisplayItem("accumulationMode"))); assertThat(displayData, not(hasDisplayItem("allowedLateness"))); assertThat(displayData, not(hasDisplayItem("closingBehavior"))); - assertThat(displayData, not(hasDisplayItem("outputTimeFn"))); + assertThat(displayData, not(hasDisplayItem("timestampCombiner"))); } @Test @@ -506,7 +507,7 @@ public class WindowTest implements Serializable { assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf( "windowFn", "trigger", - "outputTimeFn", + "timestampCombiner", "allowedLateness", "closingBehavior"))))); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index a3f5352..30b0311 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -76,7 +76,7 @@ public class WindowingTest implements Serializable { public PCollection<String> expand(PCollection<String> in) { return in.apply("Window", Window.<String>into(windowFn) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) + .withTimestampCombiner(TimestampCombiner.EARLIEST)) .apply(Count.<String>perElement()) .apply("FormatCounts", ParDo.of(new FormatCountsDoFn())) .setCoder(StringUtf8Coder.of()); http://git-wip-us.apache.org/repos/asf/beam/blob/d1395dce/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java index 50edd83..215b0f4 100644 --- a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java +++ b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java @@ -42,7 +42,6 @@ public class GcpCoreApiSurfaceTest { "com.google.api.services.cloudresourcemanager", "com.google.api.services.storage", "com.google.auth", - "com.google.protobuf", "com.fasterxml.jackson.annotation", "com.fasterxml.jackson.core", "com.fasterxml.jackson.databind",
