http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java index ef501d4..7df2f89 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PaneExtractorsTest.java @@ -23,10 +23,10 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.WindowedValue; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -34,32 +34,33 @@ import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link PaneExtractors}. - */ +/** Tests for {@link PaneExtractors}. */ @RunWith(JUnit4.class) public class PaneExtractorsTest { @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void onlyPaneNoFiring() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.onlyPane(); - Iterable<WindowedValue<Integer>> noFiring = + Iterable<ValueInSingleWindow<Integer>> noFiring = ImmutableList.of( - WindowedValue.valueInGlobalWindow(9), WindowedValue.valueInEmptyWindows(19)); + ValueInSingleWindow.of( + 9, BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + ValueInSingleWindow.of( + 19, BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); assertThat(extractor.apply(noFiring), containsInAnyOrder(9, 19)); } @Test public void onlyPaneOnlyOneFiring() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.onlyPane(); - Iterable<WindowedValue<Integer>> onlyFiring = + Iterable<ValueInSingleWindow<Integer>> onlyFiring = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING), - WindowedValue.of( + ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); assertThat(extractor.apply(onlyFiring), containsInAnyOrder(2, 1)); @@ -67,21 +68,21 @@ public class PaneExtractorsTest { @Test public void onlyPaneMultiplePanesFails() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.onlyPane(); - Iterable<WindowedValue<Integer>> multipleFiring = + Iterable<ValueInSingleWindow<Integer>> multipleFiring = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(true, false, Timing.EARLY)), - WindowedValue.of( + ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, @@ -94,16 +95,16 @@ public class PaneExtractorsTest { @Test public void onTimePane() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.onTimePane(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, @@ -114,26 +115,26 @@ public class PaneExtractorsTest { @Test public void onTimePaneOnlyEarlyAndLate() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.onTimePane(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)), - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, @@ -144,21 +145,21 @@ public class PaneExtractorsTest { @Test public void finalPane() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.finalPane(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, true, Timing.LATE, 2L, 1L)), - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, @@ -169,21 +170,21 @@ public class PaneExtractorsTest { @Test public void finalPaneNoExplicitFinalEmpty() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.finalPane(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)), - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, @@ -194,15 +195,15 @@ public class PaneExtractorsTest { @Test public void nonLatePanesSingleOnTime() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.nonLatePanes(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING), - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING), - WindowedValue.of( + ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(2, 4, 8)); @@ -210,16 +211,16 @@ public class PaneExtractorsTest { @Test public void nonLatePanesSingleEarly() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.nonLatePanes(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(true, false, Timing.EARLY)), - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, @@ -230,11 +231,11 @@ public class PaneExtractorsTest { @Test public void allPanesSingleLate() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.nonLatePanes(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, @@ -245,22 +246,22 @@ public class PaneExtractorsTest { @Test public void nonLatePanesMultiplePanes() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.nonLatePanes(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)), - WindowedValue.of(7, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), - WindowedValue.of( + ValueInSingleWindow.of(7, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING), + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, @@ -271,15 +272,15 @@ public class PaneExtractorsTest { @Test public void allPanesSinglePane() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.allPanes(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING), - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING), - WindowedValue.of( + ValueInSingleWindow.of( 2, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.ON_TIME_AND_ONLY_FIRING)); assertThat(extractor.apply(onlyOnTime), containsInAnyOrder(2, 4, 8)); @@ -287,21 +288,21 @@ public class PaneExtractorsTest { @Test public void allPanesMultiplePanes() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.allPanes(); - Iterable<WindowedValue<Integer>> onlyOnTime = + Iterable<ValueInSingleWindow<Integer>> onlyOnTime = ImmutableList.of( - WindowedValue.of( + ValueInSingleWindow.of( 8, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.LATE, 2L, 1L)), - WindowedValue.of( + ValueInSingleWindow.of( 4, new Instant(0L), GlobalWindow.INSTANCE, PaneInfo.createPane(false, false, Timing.ON_TIME, 1L, 0L)), - WindowedValue.of( + ValueInSingleWindow.of( 1, new Instant(0L), GlobalWindow.INSTANCE, @@ -312,9 +313,9 @@ public class PaneExtractorsTest { @Test public void allPanesEmpty() { - SerializableFunction<Iterable<WindowedValue<Integer>>, Iterable<Integer>> extractor = + SerializableFunction<Iterable<ValueInSingleWindow<Integer>>, Iterable<Integer>> extractor = PaneExtractors.allPanes(); - Iterable<WindowedValue<Integer>> noPanes = ImmutableList.of(); + Iterable<ValueInSingleWindow<Integer>> noPanes = ImmutableList.of(); assertThat(extractor.apply(noPanes), emptyIterable()); }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java new file mode 100644 index 0000000..daf73b6 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/ValueInSingleWindowCoderTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link ValueInSingleWindow.Coder}. */ +@RunWith(JUnit4.class) +public class ValueInSingleWindowCoderTest { + @Test + public void testDecodeEncodeEqual() throws Exception { + Instant now = Instant.now(); + ValueInSingleWindow<String> value = + ValueInSingleWindow.of( + "foo", + now, + new IntervalWindow(now, now.plus(Duration.standardSeconds(10))), + PaneInfo.NO_FIRING); + + CoderProperties.coderDecodeEncodeEqual( + ValueInSingleWindow.Coder.of(StringUtf8Coder.of(), IntervalWindow.getCoder()), value); + } + + @Test + public void testCoderSerializable() throws Exception { + CoderProperties.coderSerializable( + ValueInSingleWindow.Coder.of(StringUtf8Coder.of(), IntervalWindow.getCoder())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d9891234/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java deleted file mode 100644 index d195623..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GatherAllPanesTest.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.junit.Assert.fail; - -import com.google.common.collect.Iterables; -import java.io.Serializable; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.testing.NeedsRunner; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.WithTimestamps; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.TypeDescriptor; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link GatherAllPanes}. - */ -@RunWith(JUnit4.class) -public class GatherAllPanesTest implements Serializable { - @Test - @Category(NeedsRunner.class) - public void singlePaneSingleReifiedPane() { - TestPipeline p = TestPipeline.create(); - PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes = - p.apply(CountingInput.upTo(20000)) - .apply( - WithTimestamps.of( - new SerializableFunction<Long, Instant>() { - @Override - public Instant apply(Long input) { - return new Instant(input * 10); - } - })) - .apply( - Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1))) - .triggering(AfterWatermark.pastEndOfWindow()) - .withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {})) - .apply(GroupByKey.<Void, Long>create()) - .apply(Values.<Iterable<Long>>create()) - .apply(GatherAllPanes.<Iterable<Long>>globally()); - - PAssert.that(accumulatedPanes) - .satisfies( - new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() { - @Override - public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) { - for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) { - if (Iterables.size(windowedInput) > 1) { - fail("Expected all windows to have exactly one pane, got " + windowedInput); - return null; - } - } - return null; - } - }); - - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void multiplePanesMultipleReifiedPane() { - TestPipeline p = TestPipeline.create(); - - PCollection<Long> someElems = p.apply("someLongs", CountingInput.upTo(20000)); - PCollection<Long> otherElems = p.apply("otherLongs", CountingInput.upTo(20000)); - PCollection<Iterable<WindowedValue<Iterable<Long>>>> accumulatedPanes = - PCollectionList.of(someElems) - .and(otherElems) - .apply(Flatten.<Long>pCollections()) - .apply( - WithTimestamps.of( - new SerializableFunction<Long, Instant>() { - @Override - public Instant apply(Long input) { - return new Instant(input * 10); - } - })) - .apply( - Window.<Long>into(FixedWindows.of(Duration.standardMinutes(1))) - .triggering( - AfterWatermark.pastEndOfWindow() - .withEarlyFirings(AfterPane.elementCountAtLeast(1))) - .withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - .apply(WithKeys.<Void, Long>of((Void) null).withKeyType(new TypeDescriptor<Void>() {})) - .apply(GroupByKey.<Void, Long>create()) - .apply(Values.<Iterable<Long>>create()) - .apply(GatherAllPanes.<Iterable<Long>>globally()); - - PAssert.that(accumulatedPanes) - .satisfies( - new SerializableFunction<Iterable<Iterable<WindowedValue<Iterable<Long>>>>, Void>() { - @Override - public Void apply(Iterable<Iterable<WindowedValue<Iterable<Long>>>> input) { - for (Iterable<WindowedValue<Iterable<Long>>> windowedInput : input) { - if (Iterables.size(windowedInput) > 1) { - return null; - } - } - fail("Expected at least one window to have multiple panes"); - return null; - } - }); - - p.run(); - } -}