Move Towards removing WindowedValue from SDK - Introduces ValueInSingleWindow for purposes of PAssert - Uses ValueInSingleWindow inside DoFnTester - Moves WindowMatchers{,Test} to runners-core
After this commit, WindowedValue does not appear in any SDK APIs used by Pipeline authors. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9891234 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9891234 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9891234 Branch: refs/heads/gearpump-runner Commit: d989123424a54699ecb47ba6c0a4e437316cabce Parents: 0fb5610 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Oct 31 15:46:25 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Fri Dec 2 13:16:04 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/core/ReduceFnRunnerTest.java | 5 +- .../beam/runners/core/SplittableParDoTest.java | 38 ++-- .../beam/runners/core/WindowMatchers.java | 204 +++++++++++++++++++ .../beam/runners/core/WindowMatchersTest.java | 82 ++++++++ .../direct/WindowEvaluatorFactoryTest.java | 4 +- .../apache/beam/sdk/testing/GatherAllPanes.java | 88 ++++++++ .../org/apache/beam/sdk/testing/PAssert.java | 77 +++---- .../apache/beam/sdk/testing/PaneExtractors.java | 55 +++-- .../beam/sdk/testing/ValueInSingleWindow.java | 134 ++++++++++++ .../apache/beam/sdk/transforms/DoFnTester.java | 58 +++--- .../apache/beam/sdk/util/GatherAllPanes.java | 86 -------- .../apache/beam/sdk/util/IdentityWindowFn.java | 2 +- .../org/apache/beam/sdk/WindowMatchers.java | 204 ------------------- .../org/apache/beam/sdk/WindowMatchersTest.java | 82 -------- .../beam/sdk/testing/GatherAllPanesTest.java | 140 +++++++++++++ .../beam/sdk/testing/PaneExtractorsTest.java | 133 ++++++------ .../testing/ValueInSingleWindowCoderTest.java | 51 +++++ .../beam/sdk/util/GatherAllPanesTest.java | 143 ------------- 18 files changed, 893 insertions(+), 693 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 20eb08b..ba57567 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue; -import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; +import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; +import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; @@ -39,7 +39,6 @@ import com.google.common.collect.Iterables; import java.util.Iterator; import java.util.List; import org.apache.beam.runners.core.triggers.TriggerStateMachine; -import org.apache.beam.sdk.WindowMatchers; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 990d892..b13d839 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.ValueInSingleWindow; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; @@ -142,14 +143,15 @@ public class SplittableParDoTest { PCollection.IsBounded.BOUNDED, makeBoundedCollection(pipeline) .apply("bounded to bounded", new SplittableParDo<>(makeParDo(boundedFn))) - .get(MAIN_OUTPUT_TAG).isBounded()); + .get(MAIN_OUTPUT_TAG) + .isBounded()); assertEquals( "Applying a bounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply( - "bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn))) - .get(MAIN_OUTPUT_TAG).isBounded()); + .apply("bounded to unbounded", new SplittableParDo<>(makeParDo(boundedFn))) + .get(MAIN_OUTPUT_TAG) + .isBounded()); } @Test @@ -160,18 +162,16 @@ public class SplittableParDoTest { "Applying an unbounded SDF to a bounded collection produces a bounded collection", PCollection.IsBounded.UNBOUNDED, makeBoundedCollection(pipeline) - .apply( - "unbounded to bounded", - new SplittableParDo<>(makeParDo(unboundedFn))) - .get(MAIN_OUTPUT_TAG).isBounded()); + .apply("unbounded to bounded", new SplittableParDo<>(makeParDo(unboundedFn))) + .get(MAIN_OUTPUT_TAG) + .isBounded()); assertEquals( "Applying an unbounded SDF to an unbounded collection produces an unbounded collection", PCollection.IsBounded.UNBOUNDED, makeUnboundedCollection(pipeline) - .apply( - "unbounded to unbounded", - new SplittableParDo<>(makeParDo(unboundedFn))) - .get(MAIN_OUTPUT_TAG).isBounded()); + .apply("unbounded to unbounded", new SplittableParDo<>(makeParDo(unboundedFn))) + .get(MAIN_OUTPUT_TAG) + .isBounded()); } // ------------------------------- Tests for ProcessFn --------------------------------- @@ -224,9 +224,11 @@ public class SplittableParDoTest { Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - tester - .getMutableOutput(tester.getMainOutputTag()) - .add(WindowedValue.of(output, timestamp, windows, pane)); + for (BoundedWindow window : windows) { + tester + .getMutableOutput(tester.getMainOutputTag()) + .add(ValueInSingleWindow.of(output, timestamp, window, pane)); + } } @Override @@ -236,7 +238,11 @@ public class SplittableParDoTest { Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - tester.getMutableOutput(tag).add(WindowedValue.of(output, timestamp, windows, pane)); + for (BoundedWindow window : windows) { + tester + .getMutableOutput(tag) + .add(ValueInSingleWindow.of(output, timestamp, window, pane)); + } } }); // Do not clone since ProcessFn references non-serializable DoFnTester itself http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java new file mode 100644 index 0000000..6c3a7e2 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.Lists; +import java.util.Collection; +import java.util.Objects; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.hamcrest.TypeSafeMatcher; +import org.joda.time.Instant; + +/** + * Matchers that are useful for working with Windowing, Timestamps, etc. + */ +public class WindowMatchers { + + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + T value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo paneInfo) { + + Collection<Matcher<? super BoundedWindow>> windowMatchers = + Lists.newArrayListWithCapacity(windows.size()); + for (BoundedWindow window : windows) { + windowMatchers.add(Matchers.equalTo(window)); + } + + return isWindowedValue( + Matchers.equalTo(value), + Matchers.equalTo(timestamp), + Matchers.containsInAnyOrder(windowMatchers), + Matchers.equalTo(paneInfo)); + } + + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, + Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher, + Matcher<? super PaneInfo> paneInfoMatcher) { + return new WindowedValueMatcher<>( + valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher); + } + + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, + Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) { + return new WindowedValueMatcher<>( + valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything()); + } + + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher) { + return new WindowedValueMatcher<>( + valueMatcher, timestampMatcher, Matchers.anything(), Matchers.anything()); + } + + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + Matcher<? super T> valueMatcher) { + return new WindowedValueMatcher<>( + valueMatcher, Matchers.anything(), Matchers.anything(), Matchers.anything()); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + T value, long timestamp, long windowStart, long windowEnd) { + return WindowMatchers.<T>isSingleWindowedValue( + Matchers.equalTo(value), timestamp, windowStart, windowEnd); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { + return WindowMatchers.<T>isSingleWindowedValue( + Matchers.equalTo(value), + Matchers.equalTo(timestamp), + Matchers.equalTo(window), + Matchers.equalTo(paneInfo)); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + T value, Instant timestamp, BoundedWindow window) { + return WindowMatchers.<T>isSingleWindowedValue( + Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window)); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) { + IntervalWindow intervalWindow = + new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)); + return WindowMatchers.<T>isSingleWindowedValue( + valueMatcher, + Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp), + Matchers.<BoundedWindow>equalTo(intervalWindow), + Matchers.anything()); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, + Matcher<? super BoundedWindow> windowMatcher) { + return new WindowedValueMatcher<T>( + valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), Matchers.anything()); + } + + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, + Matcher<? super BoundedWindow> windowMatcher, + Matcher<? super PaneInfo> paneInfoMatcher) { + return new WindowedValueMatcher<T>( + valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), paneInfoMatcher); + } + + public static Matcher<IntervalWindow> intervalWindow(long start, long end) { + return Matchers.equalTo(new IntervalWindow(new Instant(start), new Instant(end))); + } + + public static <T> Matcher<WindowedValue<? extends T>> valueWithPaneInfo(final PaneInfo paneInfo) { + return new TypeSafeMatcher<WindowedValue<? extends T>>() { + @Override + public void describeTo(Description description) { + description + .appendText("WindowedValue(paneInfo = ").appendValue(paneInfo).appendText(")"); + } + + @Override + protected boolean matchesSafely(WindowedValue<? extends T> item) { + return Objects.equals(item.getPane(), paneInfo); + } + + @Override + protected void describeMismatchSafely( + WindowedValue<? extends T> item, Description mismatchDescription) { + mismatchDescription.appendValue(item.getPane()); + } + }; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @SafeVarargs + public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows( + Matcher<W>... windows) { + return (Matcher) Matchers.<W>containsInAnyOrder(windows); + } + + private WindowMatchers() {} + + private static class WindowedValueMatcher<T> extends TypeSafeMatcher<WindowedValue<? extends T>> { + + private Matcher<? super T> valueMatcher; + private Matcher<? super Instant> timestampMatcher; + private Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher; + private Matcher<? super PaneInfo> paneInfoMatcher; + + private WindowedValueMatcher( + Matcher<? super T> valueMatcher, + Matcher<? super Instant> timestampMatcher, + Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher, + Matcher<? super PaneInfo> paneInfoMatcher) { + this.valueMatcher = valueMatcher; + this.timestampMatcher = timestampMatcher; + this.windowsMatcher = windowsMatcher; + this.paneInfoMatcher = paneInfoMatcher; + } + + @Override + public void describeTo(Description description) { + description + .appendText("a WindowedValue(").appendValue(valueMatcher) + .appendText(", ").appendValue(timestampMatcher) + .appendText(", ").appendValue(windowsMatcher) + .appendText(", ").appendValue(paneInfoMatcher) + .appendText(")"); + } + + @Override + protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) { + return valueMatcher.matches(windowedValue.getValue()) + && timestampMatcher.matches(windowedValue.getTimestamp()) + && windowsMatcher.matches(windowedValue.getWindows()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java new file mode 100644 index 0000000..6f4741a --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertThat; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link WindowMatchers}. + */ +@RunWith(JUnit4.class) +public class WindowMatchersTest { + + @Test + public void testIsWindowedValueExact() { + long timestamp = 100; + long windowStart = 0; + long windowEnd = 200; + + assertThat( + WindowedValue.of( + "hello", + new Instant(timestamp), + new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)), + PaneInfo.NO_FIRING), + WindowMatchers.isWindowedValue( + "hello", + new Instant(timestamp), + ImmutableList.of(new IntervalWindow(new Instant(windowStart), new Instant(windowEnd))), + PaneInfo.NO_FIRING)); + } + + @Test + public void testIsWindowedValueReorderedWindows() { + long timestamp = 100; + long windowStart = 0; + long windowEnd = 200; + long windowStart2 = 50; + long windowEnd2 = 150; + + assertThat( + WindowedValue.of( + "hello", + new Instant(timestamp), + ImmutableList.of( + new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)), + new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))), + PaneInfo.NO_FIRING), + WindowMatchers.isWindowedValue( + "hello", + new Instant(timestamp), + ImmutableList.of( + new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)), + new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))), + PaneInfo.NO_FIRING)); + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index e2f987c..66c28ce 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -17,8 +17,8 @@ */ package org.apache.beam.runners.direct; -import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue; -import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; +import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; +import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java new file mode 100644 index 0000000..2b311b7 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.IdentityWindowFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** + * Gathers all panes of each window into exactly one output. + * + * <p>Note that this will delay the output of a window until the garbage collection time (when the + * watermark passes the end of the window plus allowed lateness) even if the upstream triggers + * closed the window earlier. + */ +class GatherAllPanes<T> + extends PTransform<PCollection<T>, PCollection<Iterable<ValueInSingleWindow<T>>>> { + /** + * Gathers all panes of each window into a single output element. + * + * <p>This will gather all output panes into a single element, which causes them to be colocated + * on a single worker. As a result, this is only suitable for {@link PCollection PCollections} + * where all of the output elements for each pane fit in memory, such as in tests. + */ + public static <T> GatherAllPanes<T> globally() { + return new GatherAllPanes<>(); + } + + private GatherAllPanes() {} + + @Override + public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) { + WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn(); + + return input + .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>())) + .setCoder( + ValueInSingleWindow.Coder.of( + input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) + .apply( + WithKeys.<Integer, ValueInSingleWindow<T>>of(0) + .withKeyType(new TypeDescriptor<Integer>() {})) + .apply( + Window.into( + new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>( + originalWindowFn.windowCoder())) + .triggering(Never.ever()) + .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()) + .discardingFiredPanes()) + // all values have the same key so they all appear as a single output element + .apply(GroupByKey.<Integer, ValueInSingleWindow<T>>create()) + .apply(Values.<Iterable<ValueInSingleWindow<T>>>create()) + .setWindowingStrategyInternal(input.getWindowingStrategy()); + } + + private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, ValueInSingleWindow<T>> { + @DoFn.ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.output(ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane())); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index b3a14aa..7dc78d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -63,8 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.util.CoderUtils; -import org.apache.beam.sdk.util.GatherAllPanes; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; @@ -349,7 +347,7 @@ public class PAssert { private static class PCollectionContentsAssert<T> implements IterableAssert<T> { private final PCollection<T> actual; private final AssertionWindows rewindowingStrategy; - private final SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor; + private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor; public PCollectionContentsAssert(PCollection<T> actual) { this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes()); @@ -358,7 +356,7 @@ public class PAssert { public PCollectionContentsAssert( PCollection<T> actual, AssertionWindows rewindowingStrategy, - SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) { + SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) { this.actual = actual; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -391,7 +389,7 @@ public class PAssert { private PCollectionContentsAssert<T> withPane( BoundedWindow window, - SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) { + SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) { @SuppressWarnings({"unchecked", "rawtypes"}) Coder<BoundedWindow> windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); @@ -523,7 +521,7 @@ public class PAssert { private final PCollection<Iterable<T>> actual; private final Coder<T> elementCoder; private final AssertionWindows rewindowingStrategy; - private final SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> + private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor; public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual) { @@ -533,7 +531,8 @@ public class PAssert { public PCollectionSingletonIterableAssert( PCollection<Iterable<T>> actual, AssertionWindows rewindowingStrategy, - SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { + SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> + paneExtractor) { this.actual = actual; @SuppressWarnings("unchecked") @@ -571,7 +570,8 @@ public class PAssert { private PCollectionSingletonIterableAssert<T> withPanes( BoundedWindow window, - SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { + SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> + paneExtractor) { @SuppressWarnings({"unchecked", "rawtypes"}) Coder<BoundedWindow> windowCoder = (Coder) actual.getWindowingStrategy().getWindowFn().windowCoder(); @@ -620,7 +620,8 @@ public class PAssert { private final PCollection<ElemT> actual; private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view; private final AssertionWindows rewindowActuals; - private final SimpleFunction<Iterable<WindowedValue<ElemT>>, Iterable<ElemT>> paneExtractor; + private final SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> + paneExtractor; private final Coder<ViewT> coder; protected PCollectionViewAssert( @@ -634,7 +635,7 @@ public class PAssert { PCollection<ElemT> actual, PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view, AssertionWindows rewindowActuals, - SimpleFunction<Iterable<WindowedValue<ElemT>>, Iterable<ElemT>> paneExtractor, + SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor, Coder<ViewT> coder) { this.actual = actual; this.view = view; @@ -660,7 +661,7 @@ public class PAssert { private PCollectionViewAssert<ElemT, ViewT> inPane( BoundedWindow window, - SimpleFunction<Iterable<WindowedValue<ElemT>>, Iterable<ElemT>> paneExtractor) { + SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>> paneExtractor) { return new PCollectionViewAssert<>( actual, view, @@ -738,13 +739,14 @@ public class PAssert { private final transient PCollection<T> actual; private final transient AssertionWindows rewindowActuals; - private final transient SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> extractPane; + private final transient SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> + extractPane; private final transient PTransform<PCollection<T>, PCollectionView<ActualT>> actualView; public static <T, ActualT> CreateActual<T, ActualT> from( PCollection<T> actual, AssertionWindows rewindowActuals, - SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> extractPane, + SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> extractPane, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { return new CreateActual<>(actual, rewindowActuals, extractPane, actualView); } @@ -752,7 +754,7 @@ public class PAssert { private CreateActual( PCollection<T> actual, AssertionWindows rewindowActuals, - SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> extractPane, + SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> extractPane, PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) { this.actual = actual; this.rewindowActuals = rewindowActuals; @@ -822,7 +824,7 @@ public class PAssert { * a single empty iterable, even though in practice most runners will not produce any element. */ private static class GroupGlobally<T> - extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>> + extends PTransform<PCollection<T>, PCollection<Iterable<ValueInSingleWindow<T>>>> implements Serializable { private final AssertionWindows rewindowingStrategy; @@ -831,20 +833,20 @@ public class PAssert { } @Override - public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) { + public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T> input) { final int combinedKey = 42; // Remove the triggering on both PTransform< - PCollection<KV<Integer, Iterable<WindowedValue<T>>>>, - PCollection<KV<Integer, Iterable<WindowedValue<T>>>>> + PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>, + PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>> removeTriggering = - Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever()) + Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever()) .discardingFiredPanes() .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()); // Group the contents by key. If it is empty, this PCollection will be empty, too. // Then key it again with a dummy key. - PCollection<KV<Integer, Iterable<WindowedValue<T>>>> groupedContents = + PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>> groupedContents = // TODO: Split the filtering from the rewindowing, and apply filtering before the Gather // if the grouping of extra records input @@ -852,45 +854,47 @@ public class PAssert { .apply("GatherAllOutputs", GatherAllPanes.<T>globally()) .apply( "RewindowActuals", - rewindowingStrategy.<Iterable<WindowedValue<T>>>windowActuals()) - .apply("KeyForDummy", WithKeys.<Integer, Iterable<WindowedValue<T>>>of(combinedKey)) + rewindowingStrategy.<Iterable<ValueInSingleWindow<T>>>windowActuals()) + .apply( + "KeyForDummy", + WithKeys.<Integer, Iterable<ValueInSingleWindow<T>>>of(combinedKey)) .apply("RemoveActualsTriggering", removeTriggering); // Create another non-empty PCollection that is keyed with a distinct dummy key - PCollection<KV<Integer, Iterable<WindowedValue<T>>>> keyedDummy = + PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>> keyedDummy = input .getPipeline() .apply( Create.of( KV.of( combinedKey, - (Iterable<WindowedValue<T>>) - Collections.<WindowedValue<T>>emptyList())) + (Iterable<ValueInSingleWindow<T>>) + Collections.<ValueInSingleWindow<T>>emptyList())) .withCoder(groupedContents.getCoder())) .apply( "WindowIntoDummy", - rewindowingStrategy.<KV<Integer, Iterable<WindowedValue<T>>>>windowDummy()) + rewindowingStrategy.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>windowDummy()) .apply("RemoveDummyTriggering", removeTriggering); // Flatten them together and group by the combined key to get a single element - PCollection<KV<Integer, Iterable<Iterable<WindowedValue<T>>>>> dummyAndContents = + PCollection<KV<Integer, Iterable<Iterable<ValueInSingleWindow<T>>>>> dummyAndContents = PCollectionList.of(groupedContents) .and(keyedDummy) .apply( "FlattenDummyAndContents", - Flatten.<KV<Integer, Iterable<WindowedValue<T>>>>pCollections()) + Flatten.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>pCollections()) .apply( "NeverTrigger", - Window.<KV<Integer, Iterable<WindowedValue<T>>>>triggering(Never.ever()) + Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever()) .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()) .discardingFiredPanes()) .apply( "GroupDummyAndContents", - GroupByKey.<Integer, Iterable<WindowedValue<T>>>create()); + GroupByKey.<Integer, Iterable<ValueInSingleWindow<T>>>create()); return dummyAndContents - .apply(Values.<Iterable<Iterable<WindowedValue<T>>>>create()) - .apply(ParDo.of(new ConcatFn<WindowedValue<T>>())); + .apply(Values.<Iterable<Iterable<ValueInSingleWindow<T>>>>create()) + .apply(ParDo.of(new ConcatFn<ValueInSingleWindow<T>>())); } } @@ -909,12 +913,12 @@ public class PAssert { implements Serializable { private final SerializableFunction<Iterable<T>, Void> checkerFn; private final AssertionWindows rewindowingStrategy; - private final SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor; + private final SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor; private GroupThenAssert( SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy, - SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) { + SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> paneExtractor) { this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; @@ -940,13 +944,14 @@ public class PAssert { extends PTransform<PCollection<Iterable<T>>, PDone> implements Serializable { private final SerializableFunction<Iterable<T>, Void> checkerFn; private final AssertionWindows rewindowingStrategy; - private final SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> + private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor; private GroupThenAssertForSingleton( SerializableFunction<Iterable<T>, Void> checkerFn, AssertionWindows rewindowingStrategy, - SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { + SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>, Iterable<Iterable<T>>> + paneExtractor) { this.checkerFn = checkerFn; this.rewindowingStrategy = rewindowingStrategy; this.paneExtractor = paneExtractor; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java index db72a0c..dd1fac9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java @@ -25,14 +25,13 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; /** - * {@link PTransform PTransforms} which take an {@link Iterable} of {@link WindowedValue - * WindowedValues} and outputs an {@link Iterable} of all values in the specified pane, dropping the - * {@link WindowedValue} metadata. + * {@link PTransform PTransforms} which take an {@link Iterable} of {@link ValueInSingleWindow + * ValueInSingleWindows} and outputs an {@link Iterable} of all values in the specified pane, + * dropping the {@link ValueInSingleWindow} metadata. * * <p>Although all of the method signatures return SimpleFunction, users should ensure to set the * coder of any output {@link PCollection}, as appropriate {@link TypeDescriptor TypeDescriptors} @@ -42,36 +41,36 @@ final class PaneExtractors { private PaneExtractors() { } - static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> onlyPane() { + static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> onlyPane() { return new ExtractOnlyPane<>(); } - static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> onTimePane() { + static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> onTimePane() { return new ExtractOnTimePane<>(); } - static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> finalPane() { + static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> finalPane() { return new ExtractFinalPane<>(); } - static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> nonLatePanes() { + static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> nonLatePanes() { return new ExtractNonLatePanes<>(); } - static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> earlyPanes() { + static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> earlyPanes() { return new ExtractEarlyPanes<>(); } - static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> allPanes() { + static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> allPanes() { return new ExtractAllPanes<>(); } private static class ExtractOnlyPane<T> - extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> { + extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> { @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> input) { + public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) { List<T> outputs = new ArrayList<>(); - for (WindowedValue<T> value : input) { + for (ValueInSingleWindow<T> value : input) { checkState(value.getPane().isFirst() && value.getPane().isLast(), "Expected elements to be produced by a trigger that fires at most once, but got" + "a value in a pane that is %s. Actual Pane Info: %s", @@ -85,11 +84,11 @@ final class PaneExtractors { private static class ExtractOnTimePane<T> - extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> { + extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> { @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> input) { + public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) { List<T> outputs = new ArrayList<>(); - for (WindowedValue<T> value : input) { + for (ValueInSingleWindow<T> value : input) { if (value.getPane().getTiming().equals(Timing.ON_TIME)) { outputs.add(value.getValue()); } @@ -100,11 +99,11 @@ final class PaneExtractors { private static class ExtractFinalPane<T> - extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> { + extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> { @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> input) { + public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) { List<T> outputs = new ArrayList<>(); - for (WindowedValue<T> value : input) { + for (ValueInSingleWindow<T> value : input) { if (value.getPane().isLast()) { outputs.add(value.getValue()); } @@ -115,11 +114,11 @@ final class PaneExtractors { private static class ExtractAllPanes<T> - extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> { + extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> { @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> input) { + public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) { List<T> outputs = new ArrayList<>(); - for (WindowedValue<T> value : input) { + for (ValueInSingleWindow<T> value : input) { outputs.add(value.getValue()); } return outputs; @@ -128,11 +127,11 @@ final class PaneExtractors { private static class ExtractNonLatePanes<T> - extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> { + extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> { @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> input) { + public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) { List<T> outputs = new ArrayList<>(); - for (WindowedValue<T> value : input) { + for (ValueInSingleWindow<T> value : input) { if (value.getPane().getTiming() != PaneInfo.Timing.LATE) { outputs.add(value.getValue()); } @@ -142,11 +141,11 @@ final class PaneExtractors { } private static class ExtractEarlyPanes<T> - extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> { + extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> { @Override - public Iterable<T> apply(Iterable<WindowedValue<T>> input) { + public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) { List<T> outputs = new ArrayList<>(); - for (WindowedValue<T> value : input) { + for (ValueInSingleWindow<T> value : input) { if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) { outputs.add(value.getValue()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java new file mode 100644 index 0000000..9ec030f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.coders.InstantCoder; +import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PropertyNames; +import org.joda.time.Instant; + +/** + * An immutable tuple of value, timestamp, window, and pane. + * + * @param <T> the type of the value + */ +@AutoValue +public abstract class ValueInSingleWindow<T> { + /** Returns the value of this {@code ValueInSingleWindow}. */ + @Nullable + public abstract T getValue(); + + /** Returns the timestamp of this {@code ValueInSingleWindow}. */ + public abstract Instant getTimestamp(); + + /** Returns the window of this {@code ValueInSingleWindow}. */ + public abstract BoundedWindow getWindow(); + + /** Returns the pane of this {@code ValueInSingleWindow} in its window. */ + public abstract PaneInfo getPane(); + + public static <T> ValueInSingleWindow<T> of( + T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { + return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo); + } + + /** A coder for {@link ValueInSingleWindow}. */ + public static class Coder<T> extends StandardCoder<ValueInSingleWindow<T>> { + private final org.apache.beam.sdk.coders.Coder<T> valueCoder; + private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder; + + public static <T> Coder<T> of( + org.apache.beam.sdk.coders.Coder<T> valueCoder, + org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) { + return new Coder<>(valueCoder, windowCoder); + } + + @JsonCreator + public static <T> Coder<T> of( + @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) + List<org.apache.beam.sdk.coders.Coder<?>> components) { + checkArgument(components.size() == 2, "Expecting 2 components, got %s", components.size()); + @SuppressWarnings("unchecked") + org.apache.beam.sdk.coders.Coder<T> valueCoder = + (org.apache.beam.sdk.coders.Coder<T>) components.get(0); + @SuppressWarnings("unchecked") + org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder = + (org.apache.beam.sdk.coders.Coder<BoundedWindow>) components.get(1); + return new Coder<>(valueCoder, windowCoder); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + Coder( + org.apache.beam.sdk.coders.Coder<T> valueCoder, + org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder) { + this.valueCoder = valueCoder; + this.windowCoder = (org.apache.beam.sdk.coders.Coder) windowCoder; + } + + @Override + public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context) + throws IOException { + Context nestedContext = context.nested(); + valueCoder.encode(windowedElem.getValue(), outStream, nestedContext); + InstantCoder.of().encode(windowedElem.getTimestamp(), outStream, nestedContext); + windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext); + PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(), outStream, context); + } + + @Override + public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException { + Context nestedContext = context.nested(); + T value = valueCoder.decode(inStream, nestedContext); + Instant timestamp = InstantCoder.of().decode(inStream, nestedContext); + BoundedWindow window = windowCoder.decode(inStream, nestedContext); + PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream, nestedContext); + return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, pane); + } + + @Override + public List<? extends org.apache.beam.sdk.coders.Coder<?>> getCoderArguments() { + // Coder arguments are coders for the type parameters of the coder - i.e. only T. + return ImmutableList.of(valueCoder); + } + + @Override + public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents() { + // Coder components are all inner coders that it uses - i.e. both T and BoundedWindow. + return ImmutableList.of(valueCoder, windowCoder); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + valueCoder.verifyDeterministic(); + windowCoder.verifyDeterministic(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 0c6043f..17fa612 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Arrays; @@ -35,8 +34,10 @@ import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.ValueInSingleWindow; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.TimerInternals; @@ -353,10 +354,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() { // TODO: Should we return an unmodifiable list? return Lists.transform(getImmutableOutput(mainOutputTag), - new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() { + new Function<ValueInSingleWindow<OutputT>, TimestampedValue<OutputT>>() { @Override @SuppressWarnings("unchecked") - public TimestampedValue<OutputT> apply(WindowedValue<OutputT> input) { + public TimestampedValue<OutputT> apply(ValueInSingleWindow<OutputT> input) { return TimestampedValue.of(input.getValue(), input.getTimestamp()); } }); @@ -378,8 +379,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { TupleTag<OutputT> tag, BoundedWindow window) { ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder = ImmutableList.builder(); - for (WindowedValue<OutputT> value : getImmutableOutput(tag)) { - if (value.getWindows().contains(window)) { + for (ValueInSingleWindow<OutputT> value : getImmutableOutput(tag)) { + if (value.getWindow().equals(window)) { valuesBuilder.add(TimestampedValue.of(value.getValue(), value.getTimestamp())); } } @@ -434,10 +435,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { public <T> List<T> peekSideOutputElements(TupleTag<T> tag) { // TODO: Should we return an unmodifiable list? return Lists.transform(getImmutableOutput(tag), - new Function<WindowedValue<T>, T>() { + new Function<ValueInSingleWindow<T>, T>() { @SuppressWarnings("unchecked") @Override - public T apply(WindowedValue<T> input) { + public T apply(ValueInSingleWindow<T> input) { return input.getValue(); }}); } @@ -510,16 +511,16 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return combiner.extractOutput(accumulator); } - private <T> List<WindowedValue<T>> getImmutableOutput(TupleTag<T> tag) { + private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag) { @SuppressWarnings({"unchecked", "rawtypes"}) - List<WindowedValue<T>> elems = (List) outputs.get(tag); + List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag); return ImmutableList.copyOf( - MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList())); + MoreObjects.firstNonNull(elems, Collections.<ValueInSingleWindow<T>>emptyList())); } @SuppressWarnings({"unchecked", "rawtypes"}) - public <T> List<WindowedValue<T>> getMutableOutput(TupleTag<T> tag) { - List<WindowedValue<T>> outputList = (List) outputs.get(tag); + public <T> List<ValueInSingleWindow<T>> getMutableOutput(TupleTag<T> tag) { + List<ValueInSingleWindow<T>> outputList = (List) outputs.get(tag); if (outputList == null) { outputList = new ArrayList<>(); outputs.put(tag, (List) outputList); @@ -612,23 +613,22 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { sideOutputWithTimestamp(tag, output, BoundedWindow.TIMESTAMP_MIN_VALUE); } - public <T> void noteOutput(TupleTag<T> tag, WindowedValue<T> output) { + public <T> void noteOutput(TupleTag<T> tag, ValueInSingleWindow<T> output) { getMutableOutput(tag).add(output); } } private TestProcessContext createProcessContext(TimestampedValue<InputT> elem) { - WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow( - elem.getValue(), elem.getTimestamp()); - - return new TestProcessContext(windowedValue); + return new TestProcessContext( + ValueInSingleWindow.of( + elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } private class TestProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext { private final TestContext context; - private final WindowedValue<InputT> element; + private final ValueInSingleWindow<InputT> element; - private TestProcessContext(WindowedValue<InputT> element) { + private TestProcessContext(ValueInSingleWindow<InputT> element) { fn.super(); this.context = createContext(fn); this.element = element; @@ -661,7 +661,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public BoundedWindow window() { - return Iterables.getOnlyElement(element.getWindows()); + return element.getWindow(); } @Override @@ -683,7 +683,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - context.noteOutput(mainOutputTag, WindowedValue.of(output, timestamp, windows, pane)); + for (BoundedWindow window : windows) { + context.noteOutput( + mainOutputTag, ValueInSingleWindow.of(output, timestamp, window, pane)); + } } @Override @@ -693,7 +696,10 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - context.noteOutput(tag, WindowedValue.of(output, timestamp, windows, pane)); + for (BoundedWindow window : windows) { + context.noteOutput( + tag, ValueInSingleWindow.of(output, timestamp, window, pane)); + } } @Override @@ -703,7 +709,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public Collection<? extends BoundedWindow> windows() { - return element.getWindows(); + return Collections.singleton(element.getWindow()); } @Override @@ -742,8 +748,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.noteOutput(tag, - WindowedValue.of(output, timestamp, element.getWindows(), element.getPane())); + context.noteOutput( + tag, ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } @Override @@ -803,7 +809,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { OldDoFn<InputT, OutputT> fn; /** The outputs from the {@link DoFn} under test. */ - private Map<TupleTag<?>, List<WindowedValue<?>>> outputs; + private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs; private InMemoryStateInternals<?> stateInternals; private InMemoryTimerInternals timerInternals; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java deleted file mode 100644 index 52a2ba8..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.Never; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * Gathers all panes of each window into exactly one output. - * - * <p>Note that this will delay the output of a window until the garbage collection time (when the - * watermark passes the end of the window plus allowed lateness) even if the upstream triggers - * closed the window earlier. - */ -public class GatherAllPanes<T> - extends PTransform<PCollection<T>, PCollection<Iterable<WindowedValue<T>>>> { - /** - * Gathers all panes of each window into a single output element. - * - * <p>This will gather all output panes into a single element, which causes them to be colocated - * on a single worker. As a result, this is only suitable for {@link PCollection PCollections} - * where all of the output elements for each pane fit in memory, such as in tests. - */ - public static <T> GatherAllPanes<T> globally() { - return new GatherAllPanes<>(); - } - - private GatherAllPanes() {} - - @Override - public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) { - WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn(); - - return input - .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>())) - .setCoder( - WindowedValue.FullWindowedValueCoder.of( - input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder())) - .apply( - WithKeys.<Integer, WindowedValue<T>>of(0).withKeyType(new TypeDescriptor<Integer>() {})) - .apply( - Window.into( - new IdentityWindowFn<KV<Integer, WindowedValue<T>>>( - originalWindowFn.windowCoder())) - .triggering(Never.ever()) - .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()) - .discardingFiredPanes()) - // all values have the same key so they all appear as a single output element - .apply(GroupByKey.<Integer, WindowedValue<T>>create()) - .apply(Values.<Iterable<WindowedValue<T>>>create()) - .setWindowingStrategyInternal(input.getWindowingStrategy()); - } - - private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T, WindowedValue<T>> { - @DoFn.ProcessElement - public void processElement(ProcessContext c, BoundedWindow window) { - c.output(WindowedValue.of(c.element(), c.timestamp(), window, c.pane())); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index 8ca1bfd..c02e1f4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -45,7 +45,7 @@ import org.joda.time.Instant; * <p>This {@link WindowFn} is an internal implementation detail of sdk-provided utilities, and * should not be used by {@link Pipeline} writers. */ -class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { +public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { /** * The coder of the type of windows of the input {@link PCollection}. This is not an arbitrary http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java deleted file mode 100644 index 3531a86..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk; - -import com.google.common.collect.Lists; -import java.util.Collection; -import java.util.Objects; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; -import org.hamcrest.TypeSafeMatcher; -import org.joda.time.Instant; - -/** - * Matchers that are useful for working with Windowing, Timestamps, etc. - */ -public class WindowMatchers { - - public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( - T value, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo paneInfo) { - - Collection<Matcher<? super BoundedWindow>> windowMatchers = - Lists.newArrayListWithCapacity(windows.size()); - for (BoundedWindow window : windows) { - windowMatchers.add(Matchers.equalTo(window)); - } - - return isWindowedValue( - Matchers.equalTo(value), - Matchers.equalTo(timestamp), - Matchers.containsInAnyOrder(windowMatchers), - Matchers.equalTo(paneInfo)); - } - - public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( - Matcher<? super T> valueMatcher, - Matcher<? super Instant> timestampMatcher, - Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher, - Matcher<? super PaneInfo> paneInfoMatcher) { - return new WindowedValueMatcher<>( - valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher); - } - - public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( - Matcher<? super T> valueMatcher, - Matcher<? super Instant> timestampMatcher, - Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) { - return new WindowedValueMatcher<>( - valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything()); - } - - public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( - Matcher<? super T> valueMatcher, Matcher<? super Instant> timestampMatcher) { - return new WindowedValueMatcher<>( - valueMatcher, timestampMatcher, Matchers.anything(), Matchers.anything()); - } - - public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( - Matcher<? super T> valueMatcher) { - return new WindowedValueMatcher<>( - valueMatcher, Matchers.anything(), Matchers.anything(), Matchers.anything()); - } - - public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - T value, long timestamp, long windowStart, long windowEnd) { - return WindowMatchers.<T>isSingleWindowedValue( - Matchers.equalTo(value), timestamp, windowStart, windowEnd); - } - - public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) { - return WindowMatchers.<T>isSingleWindowedValue( - Matchers.equalTo(value), - Matchers.equalTo(timestamp), - Matchers.equalTo(window), - Matchers.equalTo(paneInfo)); - } - - public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - T value, Instant timestamp, BoundedWindow window) { - return WindowMatchers.<T>isSingleWindowedValue( - Matchers.equalTo(value), Matchers.equalTo(timestamp), Matchers.equalTo(window)); - } - - public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - Matcher<T> valueMatcher, long timestamp, long windowStart, long windowEnd) { - IntervalWindow intervalWindow = - new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)); - return WindowMatchers.<T>isSingleWindowedValue( - valueMatcher, - Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)), timestamp), - Matchers.<BoundedWindow>equalTo(intervalWindow), - Matchers.anything()); - } - - public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - Matcher<? super T> valueMatcher, - Matcher<? super Instant> timestampMatcher, - Matcher<? super BoundedWindow> windowMatcher) { - return new WindowedValueMatcher<T>( - valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), Matchers.anything()); - } - - public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( - Matcher<? super T> valueMatcher, - Matcher<? super Instant> timestampMatcher, - Matcher<? super BoundedWindow> windowMatcher, - Matcher<? super PaneInfo> paneInfoMatcher) { - return new WindowedValueMatcher<T>( - valueMatcher, timestampMatcher, Matchers.contains(windowMatcher), paneInfoMatcher); - } - - public static Matcher<IntervalWindow> intervalWindow(long start, long end) { - return Matchers.equalTo(new IntervalWindow(new Instant(start), new Instant(end))); - } - - public static <T> Matcher<WindowedValue<? extends T>> valueWithPaneInfo(final PaneInfo paneInfo) { - return new TypeSafeMatcher<WindowedValue<? extends T>>() { - @Override - public void describeTo(Description description) { - description - .appendText("WindowedValue(paneInfo = ").appendValue(paneInfo).appendText(")"); - } - - @Override - protected boolean matchesSafely(WindowedValue<? extends T> item) { - return Objects.equals(item.getPane(), paneInfo); - } - - @Override - protected void describeMismatchSafely( - WindowedValue<? extends T> item, Description mismatchDescription) { - mismatchDescription.appendValue(item.getPane()); - } - }; - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @SafeVarargs - public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows( - Matcher<W>... windows) { - return (Matcher) Matchers.<W>containsInAnyOrder(windows); - } - - private WindowMatchers() {} - - private static class WindowedValueMatcher<T> extends TypeSafeMatcher<WindowedValue<? extends T>> { - - private Matcher<? super T> valueMatcher; - private Matcher<? super Instant> timestampMatcher; - private Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher; - private Matcher<? super PaneInfo> paneInfoMatcher; - - private WindowedValueMatcher( - Matcher<? super T> valueMatcher, - Matcher<? super Instant> timestampMatcher, - Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher, - Matcher<? super PaneInfo> paneInfoMatcher) { - this.valueMatcher = valueMatcher; - this.timestampMatcher = timestampMatcher; - this.windowsMatcher = windowsMatcher; - this.paneInfoMatcher = paneInfoMatcher; - } - - @Override - public void describeTo(Description description) { - description - .appendText("a WindowedValue(").appendValue(valueMatcher) - .appendText(", ").appendValue(timestampMatcher) - .appendText(", ").appendValue(windowsMatcher) - .appendText(", ").appendValue(paneInfoMatcher) - .appendText(")"); - } - - @Override - protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) { - return valueMatcher.matches(windowedValue.getValue()) - && timestampMatcher.matches(windowedValue.getTimestamp()) - && windowsMatcher.matches(windowedValue.getWindows()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java deleted file mode 100644 index 89637e2..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk; - -import static org.junit.Assert.assertThat; - -import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link WindowMatchers}. - */ -@RunWith(JUnit4.class) -public class WindowMatchersTest { - - @Test - public void testIsWindowedValueExact() { - long timestamp = 100; - long windowStart = 0; - long windowEnd = 200; - - assertThat( - WindowedValue.of( - "hello", - new Instant(timestamp), - new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)), - PaneInfo.NO_FIRING), - WindowMatchers.isWindowedValue( - "hello", - new Instant(timestamp), - ImmutableList.of(new IntervalWindow(new Instant(windowStart), new Instant(windowEnd))), - PaneInfo.NO_FIRING)); - } - - @Test - public void testIsWindowedValueReorderedWindows() { - long timestamp = 100; - long windowStart = 0; - long windowEnd = 200; - long windowStart2 = 50; - long windowEnd2 = 150; - - assertThat( - WindowedValue.of( - "hello", - new Instant(timestamp), - ImmutableList.of( - new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)), - new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))), - PaneInfo.NO_FIRING), - WindowMatchers.isWindowedValue( - "hello", - new Instant(timestamp), - ImmutableList.of( - new IntervalWindow(new Instant(windowStart), new Instant(windowEnd)), - new IntervalWindow(new Instant(windowStart2), new Instant(windowEnd2))), - PaneInfo.NO_FIRING)); - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java new file mode 100644 index 0000000..417147f --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import static org.junit.Assert.fail; + +import com.google.common.collect.Iterables; +import java.io.Serializable; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.WithTimestamps; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link GatherAllPanes}. */ +@RunWith(JUnit4.class) +public class GatherAllPanesTest implements Serializable { + @Test + @Category(NeedsRunner.class) + public void singlePaneSingleReifiedPane() { + TestPipeline p = TestPipeline.create(); + PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes = + p.apply(CountingInput.upTo(20000)) + .apply( + WithTimestamps.of( + new SerializableFunction<Long, Instant>() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering(AfterWatermark.pastEndOfWindow()) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {})) + .apply(GroupByKey.<Void, Long>create()) + .apply(Values.<Iterable<Long>>create()) + .apply(GatherAllPanes.<Iterable<Long>>globally()); + + PAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction< + Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>>, Void>() { + @Override + public Void apply(Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>> input) { + for (Iterable<ValueInSingleWindow<Iterable<Long>>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + fail("Expected all windows to have exactly one pane, got " + windowedInput); + return null; + } + } + return null; + } + }); + + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void multiplePanesMultipleReifiedPane() { + TestPipeline p = TestPipeline.create(); + + PCollection<Long> someElems = p.apply("someLongs", CountingInput.upTo(20000)); + PCollection<Long> otherElems = p.apply("otherLongs", CountingInput.upTo(20000)); + PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes = + PCollectionList.of(someElems) + .and(otherElems) + .apply(Flatten.<Long>pCollections()) + .apply( + WithTimestamps.of( + new SerializableFunction<Long, Instant>() { + @Override + public Instant apply(Long input) { + return new Instant(input * 10); + } + })) + .apply( + Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1))) + .triggering( + AfterWatermark.pastEndOfWindow() + .withEarlyFirings(AfterPane.elementCountAtLeast(1))) + .withAllowedLateness(Duration.ZERO) + .discardingFiredPanes()) + .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {})) + .apply(GroupByKey.<Void, Long>create()) + .apply(Values.<Iterable<Long>>create()) + .apply(GatherAllPanes.<Iterable<Long>>globally()); + + PAssert.that(accumulatedPanes) + .satisfies( + new SerializableFunction< + Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>>, Void>() { + @Override + public Void apply(Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>> input) { + for (Iterable<ValueInSingleWindow<Iterable<Long>>> windowedInput : input) { + if (Iterables.size(windowedInput) > 1) { + return null; + } + } + fail("Expected at least one window to have multiple panes"); + return null; + } + }); + + p.run(); + } +}