Repository: incubator-beam Updated Branches: refs/heads/master b1f7013d8 -> 8042d52fc
Revert "Remove WindowedValue.valueInEmptyWindows" This reverts commit 0e49b150e83d85ae432c640da937a9497068e71b, which breaks some DataflowRunner integration tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/98ab5594 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/98ab5594 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/98ab5594 Branch: refs/heads/master Commit: 98ab559410bde425c9c1944bcd2f09293c3764dc Parents: b1f7013 Author: Kenneth Knowles <k...@google.com> Authored: Tue Nov 29 16:57:09 2016 -0800 Committer: Dan Halperin <dhalp...@google.com> Committed: Tue Nov 29 17:40:50 2016 -0800 ---------------------------------------------------------------------- .../direct/FlattenEvaluatorFactoryTest.java | 8 ++--- .../beam/runners/dataflow/DataflowRunner.java | 10 +++--- .../org/apache/beam/sdk/util/WindowedValue.java | 33 +++++++++++++++++--- .../beam/sdk/testing/PaneExtractorsTest.java | 2 +- .../apache/beam/sdk/util/WindowedValueTest.java | 10 ++++++ 5 files changed, 49 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index 39c7cab..cb27fbc 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -78,9 +78,9 @@ public class FlattenEvaluatorFactoryTest { rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1)); leftSideEvaluator.processElement( WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024))); - leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING)); + leftSideEvaluator.processElement(WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING)); rightSideEvaluator.processElement( - WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING)); + WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING)); rightSideEvaluator.processElement( WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096))); @@ -104,12 +104,12 @@ public class FlattenEvaluatorFactoryTest { flattenedLeftBundle.commit(Instant.now()).getElements(), containsInAnyOrder( WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1024)), - WindowedValue.valueInGlobalWindow(4, PaneInfo.NO_FIRING), + WindowedValue.valueInEmptyWindows(4, PaneInfo.NO_FIRING), WindowedValue.valueInGlobalWindow(1))); assertThat( flattenedRightBundle.commit(Instant.now()).getElements(), containsInAnyOrder( - WindowedValue.valueInGlobalWindow(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), + WindowedValue.valueInEmptyWindows(2, PaneInfo.ON_TIME_AND_ONLY_FIRING), WindowedValue.timestampedValueInGlobalWindow(-4, new Instant(-4096)), WindowedValue.valueInGlobalWindow(-1))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 641daf4..0099856 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Strings.isNullOrEmpty; import static org.apache.beam.sdk.util.StringUtils.approximatePTransformName; import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName; -import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; +import static org.apache.beam.sdk.util.WindowedValue.valueInEmptyWindows; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -1230,7 +1230,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // are at a window boundary. c.output(IsmRecord.of( ImmutableList.of(previousWindow.get()), - valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(), map)))); + valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map)))); map = new HashMap<>(); } @@ -1251,7 +1251,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // window boundary. c.output(IsmRecord.of( ImmutableList.of(previousWindow.get()), - valueInGlobalWindow(new TransformedMap<>(WindowedValueToValue.<V>of(), map)))); + valueInEmptyWindows(new TransformedMap<>(WindowedValueToValue.<V>of(), map)))); } } @@ -1718,7 +1718,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { Iterable<WindowedValue<V>>, Iterable<V>>>>of( ImmutableList.of(previousWindow.get()), - valueInGlobalWindow( + valueInEmptyWindows( new TransformedMap<>( IterableWithWindowedValuesToIterable.<V>of(), resultMap)))); multimap = HashMultimap.create(); @@ -1739,7 +1739,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { Iterable<WindowedValue<V>>, Iterable<V>>>>of( ImmutableList.of(previousWindow.get()), - valueInGlobalWindow( + valueInEmptyWindows( new TransformedMap<>(IterableWithWindowedValuesToIterable.<V>of(), resultMap)))); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java index 3251f09..a0b4cf5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java @@ -55,7 +55,8 @@ import org.joda.time.Instant; public abstract class WindowedValue<T> { /** - * Returns a {@code WindowedValue} with the given value, timestamp, and windows. + * Returns a {@code WindowedValue} with the given value, timestamp, + * and windows. */ public static <T> WindowedValue<T> of( T value, @@ -63,10 +64,10 @@ public abstract class WindowedValue<T> { Collection<? extends BoundedWindow> windows, PaneInfo pane) { checkNotNull(pane); - checkArgument( - windows.size() > 0, "Cannot create %s in no windows", WindowedValue.class.getName()); - if (windows.size() == 1) { + if (windows.size() == 0 && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) { + return valueInEmptyWindows(value, pane); + } else if (windows.size() == 1) { return of(value, timestamp, windows.iterator().next(), pane); } else { return new TimestampedValueInMultipleWindows<>(value, timestamp, windows, pane); @@ -122,6 +123,30 @@ public abstract class WindowedValue<T> { } /** + * Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp + * and pane. + * + * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed to drop + * it at any point, and benign runner implementation details could cause silent data loss. + */ + @Deprecated + public static <T> WindowedValue<T> valueInEmptyWindows(T value) { + return new ValueInEmptyWindows<T>(value, PaneInfo.NO_FIRING); + } + + /** + * Returns a {@code WindowedValue} with the given value in no windows, and the default timestamp + * and the specified pane. + * + * @deprecated a value in no windows technically is not "in" a PCollection. It is allowed to drop + * it at any point, and benign runner implementation details could cause silent data loss. + */ + @Deprecated + public static <T> WindowedValue<T> valueInEmptyWindows(T value, PaneInfo pane) { + return new ValueInEmptyWindows<T>(value, pane); + } + + /** * Returns a new {@code WindowedValue} that is a copy of this one, but with a different value, * which may have a new type {@code NewT}. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java index 79106ea..ef501d4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java @@ -47,7 +47,7 @@ public class PaneExtractorsTest { PaneExtractors.onlyPane(); Iterable<WindowedValue<Integer>> noFiring = ImmutableList.of( - WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInGlobalWindow(19)); + WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInEmptyWindows(19)); assertThat(extractor.apply(noFiring), containsInAnyOrder(9, 19)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/98ab5594/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java index f7656cc..0c69a59 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowedValueTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; @@ -63,6 +64,15 @@ public class WindowedValueTest { } @Test + public void testExplodeWindowsInNoWindowsEmptyIterable() { + WindowedValue<String> value = + WindowedValue.of( + "foo", Instant.now(), ImmutableList.<BoundedWindow>of(), PaneInfo.NO_FIRING); + + assertThat(value.explodeWindows(), emptyIterable()); + } + + @Test public void testExplodeWindowsInOneWindowEquals() { Instant now = Instant.now(); BoundedWindow window = new IntervalWindow(now.minus(1000L), now.plus(1000L));