Move Towards removing WindowedValue from SDK
- Introduces ValueInSingleWindow for purposes of PAssert
- Uses ValueInSingleWindow inside DoFnTester
- Moves WindowMatchers{,Test} to runners-core
After this commit, WindowedValue does not appear in any SDK APIs
used by Pipeline authors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d9891234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d9891234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d9891234
Branch: refs/heads/gearpump-runner
Commit: d989123424a54699ecb47ba6c0a4e437316cabce
Parents: 0fb5610
Author: Eugene Kirpichov <[email protected]>
Authored: Mon Oct 31 15:46:25 2016 -0700
Committer: Thomas Groh <[email protected]>
Committed: Fri Dec 2 13:16:04 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/ReduceFnRunnerTest.java | 5 +-
.../beam/runners/core/SplittableParDoTest.java | 38 ++--
.../beam/runners/core/WindowMatchers.java | 204 +++++++++++++++++++
.../beam/runners/core/WindowMatchersTest.java | 82 ++++++++
.../direct/WindowEvaluatorFactoryTest.java | 4 +-
.../apache/beam/sdk/testing/GatherAllPanes.java | 88 ++++++++
.../org/apache/beam/sdk/testing/PAssert.java | 77 +++----
.../apache/beam/sdk/testing/PaneExtractors.java | 55 +++--
.../beam/sdk/testing/ValueInSingleWindow.java | 134 ++++++++++++
.../apache/beam/sdk/transforms/DoFnTester.java | 58 +++---
.../apache/beam/sdk/util/GatherAllPanes.java | 86 --------
.../apache/beam/sdk/util/IdentityWindowFn.java | 2 +-
.../org/apache/beam/sdk/WindowMatchers.java | 204 -------------------
.../org/apache/beam/sdk/WindowMatchersTest.java | 82 --------
.../beam/sdk/testing/GatherAllPanesTest.java | 140 +++++++++++++
.../beam/sdk/testing/PaneExtractorsTest.java | 133 ++++++------
.../testing/ValueInSingleWindowCoderTest.java | 51 +++++
.../beam/sdk/util/GatherAllPanesTest.java | 143 -------------
18 files changed, 893 insertions(+), 693 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 20eb08b..ba57567 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.core;
import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
-import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+import static
org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
@@ -39,7 +39,6 @@ import com.google.common.collect.Iterables;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
-import org.apache.beam.sdk.WindowMatchers;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 990d892..b13d839 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
@@ -142,14 +143,15 @@ public class SplittableParDoTest {
PCollection.IsBounded.BOUNDED,
makeBoundedCollection(pipeline)
.apply("bounded to bounded", new
SplittableParDo<>(makeParDo(boundedFn)))
- .get(MAIN_OUTPUT_TAG).isBounded());
+ .get(MAIN_OUTPUT_TAG)
+ .isBounded());
assertEquals(
"Applying a bounded SDF to an unbounded collection produces an
unbounded collection",
PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
- .apply(
- "bounded to unbounded", new
SplittableParDo<>(makeParDo(boundedFn)))
- .get(MAIN_OUTPUT_TAG).isBounded());
+ .apply("bounded to unbounded", new
SplittableParDo<>(makeParDo(boundedFn)))
+ .get(MAIN_OUTPUT_TAG)
+ .isBounded());
}
@Test
@@ -160,18 +162,16 @@ public class SplittableParDoTest {
"Applying an unbounded SDF to a bounded collection produces a bounded
collection",
PCollection.IsBounded.UNBOUNDED,
makeBoundedCollection(pipeline)
- .apply(
- "unbounded to bounded",
- new SplittableParDo<>(makeParDo(unboundedFn)))
- .get(MAIN_OUTPUT_TAG).isBounded());
+ .apply("unbounded to bounded", new
SplittableParDo<>(makeParDo(unboundedFn)))
+ .get(MAIN_OUTPUT_TAG)
+ .isBounded());
assertEquals(
"Applying an unbounded SDF to an unbounded collection produces an
unbounded collection",
PCollection.IsBounded.UNBOUNDED,
makeUnboundedCollection(pipeline)
- .apply(
- "unbounded to unbounded",
- new SplittableParDo<>(makeParDo(unboundedFn)))
- .get(MAIN_OUTPUT_TAG).isBounded());
+ .apply("unbounded to unbounded", new
SplittableParDo<>(makeParDo(unboundedFn)))
+ .get(MAIN_OUTPUT_TAG)
+ .isBounded());
}
// ------------------------------- Tests for ProcessFn
---------------------------------
@@ -224,9 +224,11 @@ public class SplittableParDoTest {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- tester
- .getMutableOutput(tester.getMainOutputTag())
- .add(WindowedValue.of(output, timestamp, windows, pane));
+ for (BoundedWindow window : windows) {
+ tester
+ .getMutableOutput(tester.getMainOutputTag())
+ .add(ValueInSingleWindow.of(output, timestamp, window,
pane));
+ }
}
@Override
@@ -236,7 +238,11 @@ public class SplittableParDoTest {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- tester.getMutableOutput(tag).add(WindowedValue.of(output,
timestamp, windows, pane));
+ for (BoundedWindow window : windows) {
+ tester
+ .getMutableOutput(tag)
+ .add(ValueInSingleWindow.of(output, timestamp, window,
pane));
+ }
}
});
// Do not clone since ProcessFn references non-serializable DoFnTester
itself
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
new file mode 100644
index 0000000..6c3a7e2
--- /dev/null
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchers.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+import org.joda.time.Instant;
+
+/**
+ * Matchers that are useful for working with Windowing, Timestamps, etc.
+ */
+public class WindowMatchers {
+
+ public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+ T value,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo paneInfo) {
+
+ Collection<Matcher<? super BoundedWindow>> windowMatchers =
+ Lists.newArrayListWithCapacity(windows.size());
+ for (BoundedWindow window : windows) {
+ windowMatchers.add(Matchers.equalTo(window));
+ }
+
+ return isWindowedValue(
+ Matchers.equalTo(value),
+ Matchers.equalTo(timestamp),
+ Matchers.containsInAnyOrder(windowMatchers),
+ Matchers.equalTo(paneInfo));
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Instant> timestampMatcher,
+ Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+ Matcher<? super PaneInfo> paneInfoMatcher) {
+ return new WindowedValueMatcher<>(
+ valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher);
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Instant> timestampMatcher,
+ Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
+ return new WindowedValueMatcher<>(
+ valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything());
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+ Matcher<? super T> valueMatcher, Matcher<? super Instant>
timestampMatcher) {
+ return new WindowedValueMatcher<>(
+ valueMatcher, timestampMatcher, Matchers.anything(),
Matchers.anything());
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
+ Matcher<? super T> valueMatcher) {
+ return new WindowedValueMatcher<>(
+ valueMatcher, Matchers.anything(), Matchers.anything(),
Matchers.anything());
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+ T value, long timestamp, long windowStart, long windowEnd) {
+ return WindowMatchers.<T>isSingleWindowedValue(
+ Matchers.equalTo(value), timestamp, windowStart, windowEnd);
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+ T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
+ return WindowMatchers.<T>isSingleWindowedValue(
+ Matchers.equalTo(value),
+ Matchers.equalTo(timestamp),
+ Matchers.equalTo(window),
+ Matchers.equalTo(paneInfo));
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+ T value, Instant timestamp, BoundedWindow window) {
+ return WindowMatchers.<T>isSingleWindowedValue(
+ Matchers.equalTo(value), Matchers.equalTo(timestamp),
Matchers.equalTo(window));
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+ Matcher<T> valueMatcher, long timestamp, long windowStart, long
windowEnd) {
+ IntervalWindow intervalWindow =
+ new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
+ return WindowMatchers.<T>isSingleWindowedValue(
+ valueMatcher,
+ Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)),
timestamp),
+ Matchers.<BoundedWindow>equalTo(intervalWindow),
+ Matchers.anything());
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Instant> timestampMatcher,
+ Matcher<? super BoundedWindow> windowMatcher) {
+ return new WindowedValueMatcher<T>(
+ valueMatcher, timestampMatcher, Matchers.contains(windowMatcher),
Matchers.anything());
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Instant> timestampMatcher,
+ Matcher<? super BoundedWindow> windowMatcher,
+ Matcher<? super PaneInfo> paneInfoMatcher) {
+ return new WindowedValueMatcher<T>(
+ valueMatcher, timestampMatcher, Matchers.contains(windowMatcher),
paneInfoMatcher);
+ }
+
+ public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
+ return Matchers.equalTo(new IntervalWindow(new Instant(start), new
Instant(end)));
+ }
+
+ public static <T> Matcher<WindowedValue<? extends T>>
valueWithPaneInfo(final PaneInfo paneInfo) {
+ return new TypeSafeMatcher<WindowedValue<? extends T>>() {
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("WindowedValue(paneInfo =
").appendValue(paneInfo).appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(WindowedValue<? extends T> item) {
+ return Objects.equals(item.getPane(), paneInfo);
+ }
+
+ @Override
+ protected void describeMismatchSafely(
+ WindowedValue<? extends T> item, Description mismatchDescription) {
+ mismatchDescription.appendValue(item.getPane());
+ }
+ };
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @SafeVarargs
+ public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows(
+ Matcher<W>... windows) {
+ return (Matcher) Matchers.<W>containsInAnyOrder(windows);
+ }
+
+ private WindowMatchers() {}
+
+ private static class WindowedValueMatcher<T> extends
TypeSafeMatcher<WindowedValue<? extends T>> {
+
+ private Matcher<? super T> valueMatcher;
+ private Matcher<? super Instant> timestampMatcher;
+ private Matcher<? super Collection<? extends BoundedWindow>>
windowsMatcher;
+ private Matcher<? super PaneInfo> paneInfoMatcher;
+
+ private WindowedValueMatcher(
+ Matcher<? super T> valueMatcher,
+ Matcher<? super Instant> timestampMatcher,
+ Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
+ Matcher<? super PaneInfo> paneInfoMatcher) {
+ this.valueMatcher = valueMatcher;
+ this.timestampMatcher = timestampMatcher;
+ this.windowsMatcher = windowsMatcher;
+ this.paneInfoMatcher = paneInfoMatcher;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("a WindowedValue(").appendValue(valueMatcher)
+ .appendText(", ").appendValue(timestampMatcher)
+ .appendText(", ").appendValue(windowsMatcher)
+ .appendText(", ").appendValue(paneInfoMatcher)
+ .appendText(")");
+ }
+
+ @Override
+ protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) {
+ return valueMatcher.matches(windowedValue.getValue())
+ && timestampMatcher.matches(windowedValue.getTimestamp())
+ && windowsMatcher.matches(windowedValue.getWindows());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
----------------------------------------------------------------------
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
new file mode 100644
index 0000000..6f4741a
--- /dev/null
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/WindowMatchersTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link WindowMatchers}.
+ */
+@RunWith(JUnit4.class)
+public class WindowMatchersTest {
+
+ @Test
+ public void testIsWindowedValueExact() {
+ long timestamp = 100;
+ long windowStart = 0;
+ long windowEnd = 200;
+
+ assertThat(
+ WindowedValue.of(
+ "hello",
+ new Instant(timestamp),
+ new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd)),
+ PaneInfo.NO_FIRING),
+ WindowMatchers.isWindowedValue(
+ "hello",
+ new Instant(timestamp),
+ ImmutableList.of(new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd))),
+ PaneInfo.NO_FIRING));
+ }
+
+ @Test
+ public void testIsWindowedValueReorderedWindows() {
+ long timestamp = 100;
+ long windowStart = 0;
+ long windowEnd = 200;
+ long windowStart2 = 50;
+ long windowEnd2 = 150;
+
+ assertThat(
+ WindowedValue.of(
+ "hello",
+ new Instant(timestamp),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd)),
+ new IntervalWindow(new Instant(windowStart2), new
Instant(windowEnd2))),
+ PaneInfo.NO_FIRING),
+ WindowMatchers.isWindowedValue(
+ "hello",
+ new Instant(timestamp),
+ ImmutableList.of(
+ new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd)),
+ new IntervalWindow(new Instant(windowStart2), new
Instant(windowEnd2))),
+ PaneInfo.NO_FIRING));
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index e2f987c..66c28ce 100644
---
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.direct;
-import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue;
-import static org.apache.beam.sdk.WindowMatchers.isWindowedValue;
+import static
org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue;
+import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
new file mode 100644
index 0000000..2b311b7
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/GatherAllPanes.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.IdentityWindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+/**
+ * Gathers all panes of each window into exactly one output.
+ *
+ * <p>Note that this will delay the output of a window until the garbage
collection time (when the
+ * watermark passes the end of the window plus allowed lateness) even if the
upstream triggers
+ * closed the window earlier.
+ */
+class GatherAllPanes<T>
+ extends PTransform<PCollection<T>,
PCollection<Iterable<ValueInSingleWindow<T>>>> {
+ /**
+ * Gathers all panes of each window into a single output element.
+ *
+ * <p>This will gather all output panes into a single element, which causes
them to be colocated
+ * on a single worker. As a result, this is only suitable for {@link
PCollection PCollections}
+ * where all of the output elements for each pane fit in memory, such as in
tests.
+ */
+ public static <T> GatherAllPanes<T> globally() {
+ return new GatherAllPanes<>();
+ }
+
+ private GatherAllPanes() {}
+
+ @Override
+ public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T>
input) {
+ WindowFn<?, ?> originalWindowFn =
input.getWindowingStrategy().getWindowFn();
+
+ return input
+ .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
+ .setCoder(
+ ValueInSingleWindow.Coder.of(
+ input.getCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder()))
+ .apply(
+ WithKeys.<Integer, ValueInSingleWindow<T>>of(0)
+ .withKeyType(new TypeDescriptor<Integer>() {}))
+ .apply(
+ Window.into(
+ new IdentityWindowFn<KV<Integer, ValueInSingleWindow<T>>>(
+ originalWindowFn.windowCoder()))
+ .triggering(Never.ever())
+
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+ .discardingFiredPanes())
+ // all values have the same key so they all appear as a single output
element
+ .apply(GroupByKey.<Integer, ValueInSingleWindow<T>>create())
+ .apply(Values.<Iterable<ValueInSingleWindow<T>>>create())
+ .setWindowingStrategyInternal(input.getWindowingStrategy());
+ }
+
+ private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T,
ValueInSingleWindow<T>> {
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow window) {
+ c.output(ValueInSingleWindow.of(c.element(), c.timestamp(), window,
c.pane()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
index b3a14aa..7dc78d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java
@@ -63,8 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.GatherAllPanes;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
@@ -349,7 +347,7 @@ public class PAssert {
private static class PCollectionContentsAssert<T> implements
IterableAssert<T> {
private final PCollection<T> actual;
private final AssertionWindows rewindowingStrategy;
- private final SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
paneExtractor;
+ private final SimpleFunction<Iterable<ValueInSingleWindow<T>>,
Iterable<T>> paneExtractor;
public PCollectionContentsAssert(PCollection<T> actual) {
this(actual, IntoGlobalWindow.<T>of(), PaneExtractors.<T>allPanes());
@@ -358,7 +356,7 @@ public class PAssert {
public PCollectionContentsAssert(
PCollection<T> actual,
AssertionWindows rewindowingStrategy,
- SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor)
{
+ SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor) {
this.actual = actual;
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
@@ -391,7 +389,7 @@ public class PAssert {
private PCollectionContentsAssert<T> withPane(
BoundedWindow window,
- SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor)
{
+ SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor) {
@SuppressWarnings({"unchecked", "rawtypes"})
Coder<BoundedWindow> windowCoder =
(Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
@@ -523,7 +521,7 @@ public class PAssert {
private final PCollection<Iterable<T>> actual;
private final Coder<T> elementCoder;
private final AssertionWindows rewindowingStrategy;
- private final SimpleFunction<Iterable<WindowedValue<Iterable<T>>>,
Iterable<Iterable<T>>>
+ private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>,
Iterable<Iterable<T>>>
paneExtractor;
public PCollectionSingletonIterableAssert(PCollection<Iterable<T>> actual)
{
@@ -533,7 +531,8 @@ public class PAssert {
public PCollectionSingletonIterableAssert(
PCollection<Iterable<T>> actual,
AssertionWindows rewindowingStrategy,
- SimpleFunction<Iterable<WindowedValue<Iterable<T>>>,
Iterable<Iterable<T>>> paneExtractor) {
+ SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>,
Iterable<Iterable<T>>>
+ paneExtractor) {
this.actual = actual;
@SuppressWarnings("unchecked")
@@ -571,7 +570,8 @@ public class PAssert {
private PCollectionSingletonIterableAssert<T> withPanes(
BoundedWindow window,
- SimpleFunction<Iterable<WindowedValue<Iterable<T>>>,
Iterable<Iterable<T>>> paneExtractor) {
+ SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>,
Iterable<Iterable<T>>>
+ paneExtractor) {
@SuppressWarnings({"unchecked", "rawtypes"})
Coder<BoundedWindow> windowCoder =
(Coder) actual.getWindowingStrategy().getWindowFn().windowCoder();
@@ -620,7 +620,8 @@ public class PAssert {
private final PCollection<ElemT> actual;
private final PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view;
private final AssertionWindows rewindowActuals;
- private final SimpleFunction<Iterable<WindowedValue<ElemT>>,
Iterable<ElemT>> paneExtractor;
+ private final SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>,
Iterable<ElemT>>
+ paneExtractor;
private final Coder<ViewT> coder;
protected PCollectionViewAssert(
@@ -634,7 +635,7 @@ public class PAssert {
PCollection<ElemT> actual,
PTransform<PCollection<ElemT>, PCollectionView<ViewT>> view,
AssertionWindows rewindowActuals,
- SimpleFunction<Iterable<WindowedValue<ElemT>>, Iterable<ElemT>>
paneExtractor,
+ SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>>
paneExtractor,
Coder<ViewT> coder) {
this.actual = actual;
this.view = view;
@@ -660,7 +661,7 @@ public class PAssert {
private PCollectionViewAssert<ElemT, ViewT> inPane(
BoundedWindow window,
- SimpleFunction<Iterable<WindowedValue<ElemT>>, Iterable<ElemT>>
paneExtractor) {
+ SimpleFunction<Iterable<ValueInSingleWindow<ElemT>>, Iterable<ElemT>>
paneExtractor) {
return new PCollectionViewAssert<>(
actual,
view,
@@ -738,13 +739,14 @@ public class PAssert {
private final transient PCollection<T> actual;
private final transient AssertionWindows rewindowActuals;
- private final transient SimpleFunction<Iterable<WindowedValue<T>>,
Iterable<T>> extractPane;
+ private final transient SimpleFunction<Iterable<ValueInSingleWindow<T>>,
Iterable<T>>
+ extractPane;
private final transient PTransform<PCollection<T>,
PCollectionView<ActualT>> actualView;
public static <T, ActualT> CreateActual<T, ActualT> from(
PCollection<T> actual,
AssertionWindows rewindowActuals,
- SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> extractPane,
+ SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
extractPane,
PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
return new CreateActual<>(actual, rewindowActuals, extractPane,
actualView);
}
@@ -752,7 +754,7 @@ public class PAssert {
private CreateActual(
PCollection<T> actual,
AssertionWindows rewindowActuals,
- SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> extractPane,
+ SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
extractPane,
PTransform<PCollection<T>, PCollectionView<ActualT>> actualView) {
this.actual = actual;
this.rewindowActuals = rewindowActuals;
@@ -822,7 +824,7 @@ public class PAssert {
* a single empty iterable, even though in practice most runners will not
produce any element.
*/
private static class GroupGlobally<T>
- extends PTransform<PCollection<T>,
PCollection<Iterable<WindowedValue<T>>>>
+ extends PTransform<PCollection<T>,
PCollection<Iterable<ValueInSingleWindow<T>>>>
implements Serializable {
private final AssertionWindows rewindowingStrategy;
@@ -831,20 +833,20 @@ public class PAssert {
}
@Override
- public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input)
{
+ public PCollection<Iterable<ValueInSingleWindow<T>>> apply(PCollection<T>
input) {
final int combinedKey = 42;
// Remove the triggering on both
PTransform<
- PCollection<KV<Integer, Iterable<WindowedValue<T>>>>,
- PCollection<KV<Integer, Iterable<WindowedValue<T>>>>>
+ PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>,
+ PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>>
removeTriggering =
- Window.<KV<Integer,
Iterable<WindowedValue<T>>>>triggering(Never.ever())
+ Window.<KV<Integer,
Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
.discardingFiredPanes()
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());
// Group the contents by key. If it is empty, this PCollection will be
empty, too.
// Then key it again with a dummy key.
- PCollection<KV<Integer, Iterable<WindowedValue<T>>>> groupedContents =
+ PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>
groupedContents =
// TODO: Split the filtering from the rewindowing, and apply
filtering before the Gather
// if the grouping of extra records
input
@@ -852,45 +854,47 @@ public class PAssert {
.apply("GatherAllOutputs", GatherAllPanes.<T>globally())
.apply(
"RewindowActuals",
-
rewindowingStrategy.<Iterable<WindowedValue<T>>>windowActuals())
- .apply("KeyForDummy", WithKeys.<Integer,
Iterable<WindowedValue<T>>>of(combinedKey))
+
rewindowingStrategy.<Iterable<ValueInSingleWindow<T>>>windowActuals())
+ .apply(
+ "KeyForDummy",
+ WithKeys.<Integer,
Iterable<ValueInSingleWindow<T>>>of(combinedKey))
.apply("RemoveActualsTriggering", removeTriggering);
// Create another non-empty PCollection that is keyed with a distinct
dummy key
- PCollection<KV<Integer, Iterable<WindowedValue<T>>>> keyedDummy =
+ PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>> keyedDummy =
input
.getPipeline()
.apply(
Create.of(
KV.of(
combinedKey,
- (Iterable<WindowedValue<T>>)
- Collections.<WindowedValue<T>>emptyList()))
+ (Iterable<ValueInSingleWindow<T>>)
+
Collections.<ValueInSingleWindow<T>>emptyList()))
.withCoder(groupedContents.getCoder()))
.apply(
"WindowIntoDummy",
- rewindowingStrategy.<KV<Integer,
Iterable<WindowedValue<T>>>>windowDummy())
+ rewindowingStrategy.<KV<Integer,
Iterable<ValueInSingleWindow<T>>>>windowDummy())
.apply("RemoveDummyTriggering", removeTriggering);
// Flatten them together and group by the combined key to get a single
element
- PCollection<KV<Integer, Iterable<Iterable<WindowedValue<T>>>>>
dummyAndContents =
+ PCollection<KV<Integer, Iterable<Iterable<ValueInSingleWindow<T>>>>>
dummyAndContents =
PCollectionList.of(groupedContents)
.and(keyedDummy)
.apply(
"FlattenDummyAndContents",
- Flatten.<KV<Integer,
Iterable<WindowedValue<T>>>>pCollections())
+ Flatten.<KV<Integer,
Iterable<ValueInSingleWindow<T>>>>pCollections())
.apply(
"NeverTrigger",
- Window.<KV<Integer,
Iterable<WindowedValue<T>>>>triggering(Never.ever())
+ Window.<KV<Integer,
Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever())
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
.discardingFiredPanes())
.apply(
"GroupDummyAndContents",
- GroupByKey.<Integer, Iterable<WindowedValue<T>>>create());
+ GroupByKey.<Integer,
Iterable<ValueInSingleWindow<T>>>create());
return dummyAndContents
- .apply(Values.<Iterable<Iterable<WindowedValue<T>>>>create())
- .apply(ParDo.of(new ConcatFn<WindowedValue<T>>()));
+ .apply(Values.<Iterable<Iterable<ValueInSingleWindow<T>>>>create())
+ .apply(ParDo.of(new ConcatFn<ValueInSingleWindow<T>>()));
}
}
@@ -909,12 +913,12 @@ public class PAssert {
implements Serializable {
private final SerializableFunction<Iterable<T>, Void> checkerFn;
private final AssertionWindows rewindowingStrategy;
- private final SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
paneExtractor;
+ private final SimpleFunction<Iterable<ValueInSingleWindow<T>>,
Iterable<T>> paneExtractor;
private GroupThenAssert(
SerializableFunction<Iterable<T>, Void> checkerFn,
AssertionWindows rewindowingStrategy,
- SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor)
{
+ SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
paneExtractor) {
this.checkerFn = checkerFn;
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
@@ -940,13 +944,14 @@ public class PAssert {
extends PTransform<PCollection<Iterable<T>>, PDone> implements
Serializable {
private final SerializableFunction<Iterable<T>, Void> checkerFn;
private final AssertionWindows rewindowingStrategy;
- private final SimpleFunction<Iterable<WindowedValue<Iterable<T>>>,
Iterable<Iterable<T>>>
+ private final SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>,
Iterable<Iterable<T>>>
paneExtractor;
private GroupThenAssertForSingleton(
SerializableFunction<Iterable<T>, Void> checkerFn,
AssertionWindows rewindowingStrategy,
- SimpleFunction<Iterable<WindowedValue<Iterable<T>>>,
Iterable<Iterable<T>>> paneExtractor) {
+ SimpleFunction<Iterable<ValueInSingleWindow<Iterable<T>>>,
Iterable<Iterable<T>>>
+ paneExtractor) {
this.checkerFn = checkerFn;
this.rewindowingStrategy = rewindowingStrategy;
this.paneExtractor = paneExtractor;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
index db72a0c..dd1fac9 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java
@@ -25,14 +25,13 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
- * {@link PTransform PTransforms} which take an {@link Iterable} of {@link
WindowedValue
- * WindowedValues} and outputs an {@link Iterable} of all values in the
specified pane, dropping the
- * {@link WindowedValue} metadata.
+ * {@link PTransform PTransforms} which take an {@link Iterable} of {@link
ValueInSingleWindow
+ * ValueInSingleWindows} and outputs an {@link Iterable} of all values in the
specified pane,
+ * dropping the {@link ValueInSingleWindow} metadata.
*
* <p>Although all of the method signatures return SimpleFunction, users
should ensure to set the
* coder of any output {@link PCollection}, as appropriate {@link
TypeDescriptor TypeDescriptors}
@@ -42,36 +41,36 @@ final class PaneExtractors {
private PaneExtractors() {
}
- static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
onlyPane() {
+ static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
onlyPane() {
return new ExtractOnlyPane<>();
}
- static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
onTimePane() {
+ static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
onTimePane() {
return new ExtractOnTimePane<>();
}
- static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
finalPane() {
+ static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
finalPane() {
return new ExtractFinalPane<>();
}
- static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
nonLatePanes() {
+ static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
nonLatePanes() {
return new ExtractNonLatePanes<>();
}
- static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
earlyPanes() {
+ static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
earlyPanes() {
return new ExtractEarlyPanes<>();
}
- static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>>
allPanes() {
+ static <T> SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>>
allPanes() {
return new ExtractAllPanes<>();
}
private static class ExtractOnlyPane<T>
- extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
List<T> outputs = new ArrayList<>();
- for (WindowedValue<T> value : input) {
+ for (ValueInSingleWindow<T> value : input) {
checkState(value.getPane().isFirst() && value.getPane().isLast(),
"Expected elements to be produced by a trigger that fires at most
once, but got"
+ "a value in a pane that is %s. Actual Pane Info: %s",
@@ -85,11 +84,11 @@ final class PaneExtractors {
private static class ExtractOnTimePane<T>
- extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
List<T> outputs = new ArrayList<>();
- for (WindowedValue<T> value : input) {
+ for (ValueInSingleWindow<T> value : input) {
if (value.getPane().getTiming().equals(Timing.ON_TIME)) {
outputs.add(value.getValue());
}
@@ -100,11 +99,11 @@ final class PaneExtractors {
private static class ExtractFinalPane<T>
- extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
List<T> outputs = new ArrayList<>();
- for (WindowedValue<T> value : input) {
+ for (ValueInSingleWindow<T> value : input) {
if (value.getPane().isLast()) {
outputs.add(value.getValue());
}
@@ -115,11 +114,11 @@ final class PaneExtractors {
private static class ExtractAllPanes<T>
- extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
List<T> outputs = new ArrayList<>();
- for (WindowedValue<T> value : input) {
+ for (ValueInSingleWindow<T> value : input) {
outputs.add(value.getValue());
}
return outputs;
@@ -128,11 +127,11 @@ final class PaneExtractors {
private static class ExtractNonLatePanes<T>
- extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
List<T> outputs = new ArrayList<>();
- for (WindowedValue<T> value : input) {
+ for (ValueInSingleWindow<T> value : input) {
if (value.getPane().getTiming() != PaneInfo.Timing.LATE) {
outputs.add(value.getValue());
}
@@ -142,11 +141,11 @@ final class PaneExtractors {
}
private static class ExtractEarlyPanes<T>
- extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> {
+ extends SimpleFunction<Iterable<ValueInSingleWindow<T>>, Iterable<T>> {
@Override
- public Iterable<T> apply(Iterable<WindowedValue<T>> input) {
+ public Iterable<T> apply(Iterable<ValueInSingleWindow<T>> input) {
List<T> outputs = new ArrayList<>();
- for (WindowedValue<T> value : input) {
+ for (ValueInSingleWindow<T> value : input) {
if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) {
outputs.add(value.getValue());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
new file mode 100644
index 0000000..9ec030f
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/ValueInSingleWindow.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.joda.time.Instant;
+
+/**
+ * An immutable tuple of value, timestamp, window, and pane.
+ *
+ * @param <T> the type of the value
+ */
+@AutoValue
+public abstract class ValueInSingleWindow<T> {
+ /** Returns the value of this {@code ValueInSingleWindow}. */
+ @Nullable
+ public abstract T getValue();
+
+ /** Returns the timestamp of this {@code ValueInSingleWindow}. */
+ public abstract Instant getTimestamp();
+
+ /** Returns the window of this {@code ValueInSingleWindow}. */
+ public abstract BoundedWindow getWindow();
+
+ /** Returns the pane of this {@code ValueInSingleWindow} in its window. */
+ public abstract PaneInfo getPane();
+
+ public static <T> ValueInSingleWindow<T> of(
+ T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
+ return new AutoValue_ValueInSingleWindow<>(value, timestamp, window,
paneInfo);
+ }
+
+ /** A coder for {@link ValueInSingleWindow}. */
+ public static class Coder<T> extends StandardCoder<ValueInSingleWindow<T>> {
+ private final org.apache.beam.sdk.coders.Coder<T> valueCoder;
+ private final org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder;
+
+ public static <T> Coder<T> of(
+ org.apache.beam.sdk.coders.Coder<T> valueCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder)
{
+ return new Coder<>(valueCoder, windowCoder);
+ }
+
+ @JsonCreator
+ public static <T> Coder<T> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<org.apache.beam.sdk.coders.Coder<?>> components) {
+ checkArgument(components.size() == 2, "Expecting 2 components, got %s",
components.size());
+ @SuppressWarnings("unchecked")
+ org.apache.beam.sdk.coders.Coder<T> valueCoder =
+ (org.apache.beam.sdk.coders.Coder<T>) components.get(0);
+ @SuppressWarnings("unchecked")
+ org.apache.beam.sdk.coders.Coder<BoundedWindow> windowCoder =
+ (org.apache.beam.sdk.coders.Coder<BoundedWindow>) components.get(1);
+ return new Coder<>(valueCoder, windowCoder);
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ Coder(
+ org.apache.beam.sdk.coders.Coder<T> valueCoder,
+ org.apache.beam.sdk.coders.Coder<? extends BoundedWindow> windowCoder)
{
+ this.valueCoder = valueCoder;
+ this.windowCoder = (org.apache.beam.sdk.coders.Coder) windowCoder;
+ }
+
+ @Override
+ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream
outStream, Context context)
+ throws IOException {
+ Context nestedContext = context.nested();
+ valueCoder.encode(windowedElem.getValue(), outStream, nestedContext);
+ InstantCoder.of().encode(windowedElem.getTimestamp(), outStream,
nestedContext);
+ windowCoder.encode(windowedElem.getWindow(), outStream, nestedContext);
+ PaneInfo.PaneInfoCoder.INSTANCE.encode(windowedElem.getPane(),
outStream, context);
+ }
+
+ @Override
+ public ValueInSingleWindow<T> decode(InputStream inStream, Context
context) throws IOException {
+ Context nestedContext = context.nested();
+ T value = valueCoder.decode(inStream, nestedContext);
+ Instant timestamp = InstantCoder.of().decode(inStream, nestedContext);
+ BoundedWindow window = windowCoder.decode(inStream, nestedContext);
+ PaneInfo pane = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream,
nestedContext);
+ return new AutoValue_ValueInSingleWindow<>(value, timestamp, window,
pane);
+ }
+
+ @Override
+ public List<? extends org.apache.beam.sdk.coders.Coder<?>>
getCoderArguments() {
+ // Coder arguments are coders for the type parameters of the coder -
i.e. only T.
+ return ImmutableList.of(valueCoder);
+ }
+
+ @Override
+ public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents()
{
+ // Coder components are all inner coders that it uses - i.e. both T and
BoundedWindow.
+ return ImmutableList.of(valueCoder, windowCoder);
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ valueCoder.verifyDeterministic();
+ windowCoder.verifyDeterministic();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 0c6043f..17fa612 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,8 +34,10 @@ import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ValueInSingleWindow;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.TimerInternals;
@@ -353,10 +354,10 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
// TODO: Should we return an unmodifiable list?
return Lists.transform(getImmutableOutput(mainOutputTag),
- new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() {
+ new Function<ValueInSingleWindow<OutputT>,
TimestampedValue<OutputT>>() {
@Override
@SuppressWarnings("unchecked")
- public TimestampedValue<OutputT> apply(WindowedValue<OutputT> input)
{
+ public TimestampedValue<OutputT> apply(ValueInSingleWindow<OutputT>
input) {
return TimestampedValue.of(input.getValue(), input.getTimestamp());
}
});
@@ -378,8 +379,8 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
TupleTag<OutputT> tag,
BoundedWindow window) {
ImmutableList.Builder<TimestampedValue<OutputT>> valuesBuilder =
ImmutableList.builder();
- for (WindowedValue<OutputT> value : getImmutableOutput(tag)) {
- if (value.getWindows().contains(window)) {
+ for (ValueInSingleWindow<OutputT> value : getImmutableOutput(tag)) {
+ if (value.getWindow().equals(window)) {
valuesBuilder.add(TimestampedValue.of(value.getValue(),
value.getTimestamp()));
}
}
@@ -434,10 +435,10 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
public <T> List<T> peekSideOutputElements(TupleTag<T> tag) {
// TODO: Should we return an unmodifiable list?
return Lists.transform(getImmutableOutput(tag),
- new Function<WindowedValue<T>, T>() {
+ new Function<ValueInSingleWindow<T>, T>() {
@SuppressWarnings("unchecked")
@Override
- public T apply(WindowedValue<T> input) {
+ public T apply(ValueInSingleWindow<T> input) {
return input.getValue();
}});
}
@@ -510,16 +511,16 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
return combiner.extractOutput(accumulator);
}
- private <T> List<WindowedValue<T>> getImmutableOutput(TupleTag<T> tag) {
+ private <T> List<ValueInSingleWindow<T>> getImmutableOutput(TupleTag<T> tag)
{
@SuppressWarnings({"unchecked", "rawtypes"})
- List<WindowedValue<T>> elems = (List) outputs.get(tag);
+ List<ValueInSingleWindow<T>> elems = (List) outputs.get(tag);
return ImmutableList.copyOf(
- MoreObjects.firstNonNull(elems,
Collections.<WindowedValue<T>>emptyList()));
+ MoreObjects.firstNonNull(elems,
Collections.<ValueInSingleWindow<T>>emptyList()));
}
@SuppressWarnings({"unchecked", "rawtypes"})
- public <T> List<WindowedValue<T>> getMutableOutput(TupleTag<T> tag) {
- List<WindowedValue<T>> outputList = (List) outputs.get(tag);
+ public <T> List<ValueInSingleWindow<T>> getMutableOutput(TupleTag<T> tag) {
+ List<ValueInSingleWindow<T>> outputList = (List) outputs.get(tag);
if (outputList == null) {
outputList = new ArrayList<>();
outputs.put(tag, (List) outputList);
@@ -612,23 +613,22 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
sideOutputWithTimestamp(tag, output, BoundedWindow.TIMESTAMP_MIN_VALUE);
}
- public <T> void noteOutput(TupleTag<T> tag, WindowedValue<T> output) {
+ public <T> void noteOutput(TupleTag<T> tag, ValueInSingleWindow<T> output)
{
getMutableOutput(tag).add(output);
}
}
private TestProcessContext createProcessContext(TimestampedValue<InputT>
elem) {
- WindowedValue<InputT> windowedValue =
WindowedValue.timestampedValueInGlobalWindow(
- elem.getValue(), elem.getTimestamp());
-
- return new TestProcessContext(windowedValue);
+ return new TestProcessContext(
+ ValueInSingleWindow.of(
+ elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING));
}
private class TestProcessContext extends OldDoFn<InputT,
OutputT>.ProcessContext {
private final TestContext context;
- private final WindowedValue<InputT> element;
+ private final ValueInSingleWindow<InputT> element;
- private TestProcessContext(WindowedValue<InputT> element) {
+ private TestProcessContext(ValueInSingleWindow<InputT> element) {
fn.super();
this.context = createContext(fn);
this.element = element;
@@ -661,7 +661,7 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
@Override
public BoundedWindow window() {
- return Iterables.getOnlyElement(element.getWindows());
+ return element.getWindow();
}
@Override
@@ -683,7 +683,10 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- context.noteOutput(mainOutputTag, WindowedValue.of(output,
timestamp, windows, pane));
+ for (BoundedWindow window : windows) {
+ context.noteOutput(
+ mainOutputTag, ValueInSingleWindow.of(output, timestamp,
window, pane));
+ }
}
@Override
@@ -693,7 +696,10 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- context.noteOutput(tag, WindowedValue.of(output, timestamp, windows,
pane));
+ for (BoundedWindow window : windows) {
+ context.noteOutput(
+ tag, ValueInSingleWindow.of(output, timestamp, window, pane));
+ }
}
@Override
@@ -703,7 +709,7 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
@Override
public Collection<? extends BoundedWindow> windows() {
- return element.getWindows();
+ return Collections.singleton(element.getWindow());
}
@Override
@@ -742,8 +748,8 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
@Override
public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant
timestamp) {
- context.noteOutput(tag,
- WindowedValue.of(output, timestamp, element.getWindows(),
element.getPane()));
+ context.noteOutput(
+ tag, ValueInSingleWindow.of(output, timestamp, element.getWindow(),
element.getPane()));
}
@Override
@@ -803,7 +809,7 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
OldDoFn<InputT, OutputT> fn;
/** The outputs from the {@link DoFn} under test. */
- private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
+ private Map<TupleTag<?>, List<ValueInSingleWindow<?>>> outputs;
private InMemoryStateInternals<?> stateInternals;
private InMemoryTimerInternals timerInternals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
deleted file mode 100644
index 52a2ba8..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Never;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
-
-/**
- * Gathers all panes of each window into exactly one output.
- *
- * <p>Note that this will delay the output of a window until the garbage
collection time (when the
- * watermark passes the end of the window plus allowed lateness) even if the
upstream triggers
- * closed the window earlier.
- */
-public class GatherAllPanes<T>
- extends PTransform<PCollection<T>,
PCollection<Iterable<WindowedValue<T>>>> {
- /**
- * Gathers all panes of each window into a single output element.
- *
- * <p>This will gather all output panes into a single element, which causes
them to be colocated
- * on a single worker. As a result, this is only suitable for {@link
PCollection PCollections}
- * where all of the output elements for each pane fit in memory, such as in
tests.
- */
- public static <T> GatherAllPanes<T> globally() {
- return new GatherAllPanes<>();
- }
-
- private GatherAllPanes() {}
-
- @Override
- public PCollection<Iterable<WindowedValue<T>>> apply(PCollection<T> input) {
- WindowFn<?, ?> originalWindowFn =
input.getWindowingStrategy().getWindowFn();
-
- return input
- .apply(ParDo.of(new ReifyTimestampsAndWindowsFn<T>()))
- .setCoder(
- WindowedValue.FullWindowedValueCoder.of(
- input.getCoder(),
input.getWindowingStrategy().getWindowFn().windowCoder()))
- .apply(
- WithKeys.<Integer, WindowedValue<T>>of(0).withKeyType(new
TypeDescriptor<Integer>() {}))
- .apply(
- Window.into(
- new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
- originalWindowFn.windowCoder()))
- .triggering(Never.ever())
-
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
- .discardingFiredPanes())
- // all values have the same key so they all appear as a single output
element
- .apply(GroupByKey.<Integer, WindowedValue<T>>create())
- .apply(Values.<Iterable<WindowedValue<T>>>create())
- .setWindowingStrategyInternal(input.getWindowingStrategy());
- }
-
- private static class ReifyTimestampsAndWindowsFn<T> extends DoFn<T,
WindowedValue<T>> {
- @DoFn.ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) {
- c.output(WindowedValue.of(c.element(), c.timestamp(), window, c.pane()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index 8ca1bfd..c02e1f4 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -45,7 +45,7 @@ import org.joda.time.Instant;
* <p>This {@link WindowFn} is an internal implementation detail of
sdk-provided utilities, and
* should not be used by {@link Pipeline} writers.
*/
-class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
+public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
/**
* The coder of the type of windows of the input {@link PCollection}. This
is not an arbitrary
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
deleted file mode 100644
index 3531a86..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import com.google.common.collect.Lists;
-import java.util.Collection;
-import java.util.Objects;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.hamcrest.Description;
-import org.hamcrest.Matcher;
-import org.hamcrest.Matchers;
-import org.hamcrest.TypeSafeMatcher;
-import org.joda.time.Instant;
-
-/**
- * Matchers that are useful for working with Windowing, Timestamps, etc.
- */
-public class WindowMatchers {
-
- public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
- T value,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
-
- Collection<Matcher<? super BoundedWindow>> windowMatchers =
- Lists.newArrayListWithCapacity(windows.size());
- for (BoundedWindow window : windows) {
- windowMatchers.add(Matchers.equalTo(window));
- }
-
- return isWindowedValue(
- Matchers.equalTo(value),
- Matchers.equalTo(timestamp),
- Matchers.containsInAnyOrder(windowMatchers),
- Matchers.equalTo(paneInfo));
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
- Matcher<? super T> valueMatcher,
- Matcher<? super Instant> timestampMatcher,
- Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
- Matcher<? super PaneInfo> paneInfoMatcher) {
- return new WindowedValueMatcher<>(
- valueMatcher, timestampMatcher, windowsMatcher, paneInfoMatcher);
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
- Matcher<? super T> valueMatcher,
- Matcher<? super Instant> timestampMatcher,
- Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher) {
- return new WindowedValueMatcher<>(
- valueMatcher, timestampMatcher, windowsMatcher, Matchers.anything());
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
- Matcher<? super T> valueMatcher, Matcher<? super Instant>
timestampMatcher) {
- return new WindowedValueMatcher<>(
- valueMatcher, timestampMatcher, Matchers.anything(),
Matchers.anything());
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue(
- Matcher<? super T> valueMatcher) {
- return new WindowedValueMatcher<>(
- valueMatcher, Matchers.anything(), Matchers.anything(),
Matchers.anything());
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- T value, long timestamp, long windowStart, long windowEnd) {
- return WindowMatchers.<T>isSingleWindowedValue(
- Matchers.equalTo(value), timestamp, windowStart, windowEnd);
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
- return WindowMatchers.<T>isSingleWindowedValue(
- Matchers.equalTo(value),
- Matchers.equalTo(timestamp),
- Matchers.equalTo(window),
- Matchers.equalTo(paneInfo));
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- T value, Instant timestamp, BoundedWindow window) {
- return WindowMatchers.<T>isSingleWindowedValue(
- Matchers.equalTo(value), Matchers.equalTo(timestamp),
Matchers.equalTo(window));
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- Matcher<T> valueMatcher, long timestamp, long windowStart, long
windowEnd) {
- IntervalWindow intervalWindow =
- new IntervalWindow(new Instant(windowStart), new Instant(windowEnd));
- return WindowMatchers.<T>isSingleWindowedValue(
- valueMatcher,
- Matchers.describedAs("%0", Matchers.equalTo(new Instant(timestamp)),
timestamp),
- Matchers.<BoundedWindow>equalTo(intervalWindow),
- Matchers.anything());
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- Matcher<? super T> valueMatcher,
- Matcher<? super Instant> timestampMatcher,
- Matcher<? super BoundedWindow> windowMatcher) {
- return new WindowedValueMatcher<T>(
- valueMatcher, timestampMatcher, Matchers.contains(windowMatcher),
Matchers.anything());
- }
-
- public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue(
- Matcher<? super T> valueMatcher,
- Matcher<? super Instant> timestampMatcher,
- Matcher<? super BoundedWindow> windowMatcher,
- Matcher<? super PaneInfo> paneInfoMatcher) {
- return new WindowedValueMatcher<T>(
- valueMatcher, timestampMatcher, Matchers.contains(windowMatcher),
paneInfoMatcher);
- }
-
- public static Matcher<IntervalWindow> intervalWindow(long start, long end) {
- return Matchers.equalTo(new IntervalWindow(new Instant(start), new
Instant(end)));
- }
-
- public static <T> Matcher<WindowedValue<? extends T>>
valueWithPaneInfo(final PaneInfo paneInfo) {
- return new TypeSafeMatcher<WindowedValue<? extends T>>() {
- @Override
- public void describeTo(Description description) {
- description
- .appendText("WindowedValue(paneInfo =
").appendValue(paneInfo).appendText(")");
- }
-
- @Override
- protected boolean matchesSafely(WindowedValue<? extends T> item) {
- return Objects.equals(item.getPane(), paneInfo);
- }
-
- @Override
- protected void describeMismatchSafely(
- WindowedValue<? extends T> item, Description mismatchDescription) {
- mismatchDescription.appendValue(item.getPane());
- }
- };
- }
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @SafeVarargs
- public static final <W extends BoundedWindow> Matcher<Iterable<W>> ofWindows(
- Matcher<W>... windows) {
- return (Matcher) Matchers.<W>containsInAnyOrder(windows);
- }
-
- private WindowMatchers() {}
-
- private static class WindowedValueMatcher<T> extends
TypeSafeMatcher<WindowedValue<? extends T>> {
-
- private Matcher<? super T> valueMatcher;
- private Matcher<? super Instant> timestampMatcher;
- private Matcher<? super Collection<? extends BoundedWindow>>
windowsMatcher;
- private Matcher<? super PaneInfo> paneInfoMatcher;
-
- private WindowedValueMatcher(
- Matcher<? super T> valueMatcher,
- Matcher<? super Instant> timestampMatcher,
- Matcher<? super Collection<? extends BoundedWindow>> windowsMatcher,
- Matcher<? super PaneInfo> paneInfoMatcher) {
- this.valueMatcher = valueMatcher;
- this.timestampMatcher = timestampMatcher;
- this.windowsMatcher = windowsMatcher;
- this.paneInfoMatcher = paneInfoMatcher;
- }
-
- @Override
- public void describeTo(Description description) {
- description
- .appendText("a WindowedValue(").appendValue(valueMatcher)
- .appendText(", ").appendValue(timestampMatcher)
- .appendText(", ").appendValue(windowsMatcher)
- .appendText(", ").appendValue(paneInfoMatcher)
- .appendText(")");
- }
-
- @Override
- protected boolean matchesSafely(WindowedValue<? extends T> windowedValue) {
- return valueMatcher.matches(windowedValue.getValue())
- && timestampMatcher.matches(windowedValue.getTimestamp())
- && windowsMatcher.matches(windowedValue.getWindows());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
deleted file mode 100644
index 89637e2..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchersTest.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk;
-
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link WindowMatchers}.
- */
-@RunWith(JUnit4.class)
-public class WindowMatchersTest {
-
- @Test
- public void testIsWindowedValueExact() {
- long timestamp = 100;
- long windowStart = 0;
- long windowEnd = 200;
-
- assertThat(
- WindowedValue.of(
- "hello",
- new Instant(timestamp),
- new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd)),
- PaneInfo.NO_FIRING),
- WindowMatchers.isWindowedValue(
- "hello",
- new Instant(timestamp),
- ImmutableList.of(new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd))),
- PaneInfo.NO_FIRING));
- }
-
- @Test
- public void testIsWindowedValueReorderedWindows() {
- long timestamp = 100;
- long windowStart = 0;
- long windowEnd = 200;
- long windowStart2 = 50;
- long windowEnd2 = 150;
-
- assertThat(
- WindowedValue.of(
- "hello",
- new Instant(timestamp),
- ImmutableList.of(
- new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd)),
- new IntervalWindow(new Instant(windowStart2), new
Instant(windowEnd2))),
- PaneInfo.NO_FIRING),
- WindowMatchers.isWindowedValue(
- "hello",
- new Instant(timestamp),
- ImmutableList.of(
- new IntervalWindow(new Instant(windowStart), new
Instant(windowEnd)),
- new IntervalWindow(new Instant(windowStart2), new
Instant(windowEnd2))),
- PaneInfo.NO_FIRING));
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
----------------------------------------------------------------------
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
new file mode 100644
index 0000000..417147f
--- /dev/null
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link GatherAllPanes}. */
+@RunWith(JUnit4.class)
+public class GatherAllPanesTest implements Serializable {
+ @Test
+ @Category(NeedsRunner.class)
+ public void singlePaneSingleReifiedPane() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>>
accumulatedPanes =
+ p.apply(CountingInput.upTo(20000))
+ .apply(
+ WithTimestamps.of(
+ new SerializableFunction<Long, Instant>() {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input * 10);
+ }
+ }))
+ .apply(
+ Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(AfterWatermark.pastEndOfWindow())
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new
TypeDescriptor<Void>() {}))
+ .apply(GroupByKey.<Void, Long>create())
+ .apply(Values.<Iterable<Long>>create())
+ .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+ PAssert.that(accumulatedPanes)
+ .satisfies(
+ new SerializableFunction<
+ Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>>,
Void>() {
+ @Override
+ public Void
apply(Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>> input) {
+ for (Iterable<ValueInSingleWindow<Iterable<Long>>>
windowedInput : input) {
+ if (Iterables.size(windowedInput) > 1) {
+ fail("Expected all windows to have exactly one pane, got "
+ windowedInput);
+ return null;
+ }
+ }
+ return null;
+ }
+ });
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void multiplePanesMultipleReifiedPane() {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<Long> someElems = p.apply("someLongs",
CountingInput.upTo(20000));
+ PCollection<Long> otherElems = p.apply("otherLongs",
CountingInput.upTo(20000));
+ PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>>
accumulatedPanes =
+ PCollectionList.of(someElems)
+ .and(otherElems)
+ .apply(Flatten.<Long>pCollections())
+ .apply(
+ WithTimestamps.of(
+ new SerializableFunction<Long, Instant>() {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input * 10);
+ }
+ }))
+ .apply(
+ Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1)))
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+
.withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+ .withAllowedLateness(Duration.ZERO)
+ .discardingFiredPanes())
+ .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new
TypeDescriptor<Void>() {}))
+ .apply(GroupByKey.<Void, Long>create())
+ .apply(Values.<Iterable<Long>>create())
+ .apply(GatherAllPanes.<Iterable<Long>>globally());
+
+ PAssert.that(accumulatedPanes)
+ .satisfies(
+ new SerializableFunction<
+ Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>>,
Void>() {
+ @Override
+ public Void
apply(Iterable<Iterable<ValueInSingleWindow<Iterable<Long>>>> input) {
+ for (Iterable<ValueInSingleWindow<Iterable<Long>>>
windowedInput : input) {
+ if (Iterables.size(windowedInput) > 1) {
+ return null;
+ }
+ }
+ fail("Expected at least one window to have multiple panes");
+ return null;
+ }
+ });
+
+ p.run();
+ }
+}