Repository: incubator-beam Updated Branches: refs/heads/master bba4c64d3 -> b9116ac42
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java deleted file mode 100644 index dfd857e..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ /dev/null @@ -1,324 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Tests for {@link ParDoSingleEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class ParDoSingleEvaluatorFactoryTest implements Serializable { - private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Test - public void testParDoInMemoryTransformEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - PCollection<Integer> collection = - input.apply( - ParDo.of( - new DoFn<String, Integer>() { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().length()); - } - })); - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection); - when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() - .forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getCounters(), equalTo(counters)); - - assertThat( - outputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<Integer>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(3), - WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), - WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {}; - PCollection<Integer> collection = - input.apply( - ParDo.of( - new DoFn<String, Integer>() { - @Override - public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element().length()); - } - })); - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection); - when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() - .forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getCounters(), equalTo(counters)); - } - - @Test - public void finishBundleWithStatePutsStateInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp()); - final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); - final StateNamespace windowNs = - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); - ParDo.Bound<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(124443L - c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); - } - }); - PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createRootBundle(mainOutput); - - when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); - assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(124438L))); - assertThat( - result.getState().state(windowNs, bagTag).read(), - containsInAnyOrder("foo", "bara", "bazam")); - } - - @Test - public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - final TimerData addedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME); - final TimerData deletedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - ParDo.Bound<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); - } - }); - PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createRootBundle(mainOutput); - - when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory() - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getTimerUpdate(), - equalTo( - TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java deleted file mode 100644 index 239ce27..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TextIOShardedWriteFactoryTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.theInstance; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.io.TextIOTest; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; - -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.File; - -/** - * Tests for {@link TextIOShardedWriteFactory}. - */ -@RunWith(JUnit4.class) -public class TextIOShardedWriteFactoryTest { - @Rule public TemporaryFolder tmp = new TemporaryFolder(); - private TextIOShardedWriteFactory factory; - - @Before - public void setup() { - factory = new TextIOShardedWriteFactory(); - } - - @Test - public void originalWithoutShardingReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform<PCollection<String>, PDone> original = - TextIO.Write.to(file.getAbsolutePath()).withoutSharding(); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardingNotSpecifiedReturnsOriginal() throws Exception { - File file = tmp.newFile("foo"); - PTransform<PCollection<String>, PDone> original = TextIO.Write.to(file.getAbsolutePath()); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, theInstance(original)); - } - - @Test - public void originalShardedToOneReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - TextIO.Write.Bound<String> original = - TextIO.Write.to(file.getAbsolutePath()).withNumShards(1); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); - - TestPipeline p = TestPipeline.create(); - String[] elems = new String[] {"foo", "bar", "baz"}; - p.apply(Create.<String>of(elems)).apply(overridden); - - file.delete(); - - p.run(); - TextIOTest.assertOutputFiles( - elems, StringUtf8Coder.of(), 1, tmp, "foo", original.getShardNameTemplate()); - } - - @Test - public void originalShardedToManyReturnsExplicitlySharded() throws Exception { - File file = tmp.newFile("foo"); - TextIO.Write.Bound<String> original = TextIO.Write.to(file.getAbsolutePath()).withNumShards(3); - PTransform<PCollection<String>, PDone> overridden = factory.override(original); - - assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); - - TestPipeline p = TestPipeline.create(); - String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"}; - p.apply(Create.<String>of(elems)).apply(overridden); - - file.delete(); - p.run(); - TextIOTest.assertOutputFiles( - elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java deleted file mode 100644 index 33dbbdc..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorServicesTest.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.any; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import com.google.common.util.concurrent.MoreExecutors; - -import org.hamcrest.Matchers; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; - -/** - * Tests for {@link TransformExecutorServices}. - */ -@RunWith(JUnit4.class) -public class TransformExecutorServicesTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private ExecutorService executorService; - private Map<TransformExecutor<?>, Boolean> scheduled; - - @Before - public void setup() { - executorService = MoreExecutors.newDirectExecutorService(); - scheduled = new ConcurrentHashMap<>(); - } - - @Test - public void parallelScheduleMultipleSchedulesBothImmediately() { - @SuppressWarnings("unchecked") - TransformExecutor<Object> first = mock(TransformExecutor.class); - @SuppressWarnings("unchecked") - TransformExecutor<Object> second = mock(TransformExecutor.class); - - TransformExecutorService parallel = - TransformExecutorServices.parallel(executorService, scheduled); - parallel.schedule(first); - parallel.schedule(second); - - verify(first).call(); - verify(second).call(); - assertThat( - scheduled, - Matchers.allOf( - Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true), - Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true))); - - parallel.complete(first); - assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)); - assertThat( - scheduled, - not( - Matchers.<TransformExecutor<?>, Boolean>hasEntry( - Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class)))); - parallel.complete(second); - assertThat(scheduled.isEmpty(), is(true)); - } - - @Test - public void serialScheduleTwoWaitsForFirstToComplete() { - @SuppressWarnings("unchecked") - TransformExecutor<Object> first = mock(TransformExecutor.class); - @SuppressWarnings("unchecked") - TransformExecutor<Object> second = mock(TransformExecutor.class); - - TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); - serial.schedule(first); - verify(first).call(); - - serial.schedule(second); - verify(second, never()).call(); - - assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true)); - assertThat( - scheduled, - not( - Matchers.<TransformExecutor<?>, Boolean>hasEntry( - Matchers.<TransformExecutor<?>>equalTo(second), any(Boolean.class)))); - - serial.complete(first); - verify(second).call(); - assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)); - assertThat( - scheduled, - not( - Matchers.<TransformExecutor<?>, Boolean>hasEntry( - Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class)))); - - serial.complete(second); - } - - @Test - public void serialCompleteNotExecutingTaskThrows() { - @SuppressWarnings("unchecked") - TransformExecutor<Object> first = mock(TransformExecutor.class); - @SuppressWarnings("unchecked") - TransformExecutor<Object> second = mock(TransformExecutor.class); - - TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); - serial.schedule(first); - thrown.expect(IllegalStateException.class); - thrown.expectMessage("unexpected currently executing"); - - serial.complete(second); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java deleted file mode 100644 index 31cb29a..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/TransformExecutorTest.java +++ /dev/null @@ -1,538 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isA; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.ByteArrayCoder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.util.IllegalMutationException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.util.concurrent.MoreExecutors; - -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Tests for {@link TransformExecutor}. - */ -@RunWith(JUnit4.class) -public class TransformExecutorTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - private PCollection<String> created; - private PCollection<KV<Integer, String>> downstream; - - private CountDownLatch evaluatorCompleted; - - private RegisteringCompletionCallback completionCallback; - private TransformExecutorService transformEvaluationState; - private BundleFactory bundleFactory; - @Mock private InProcessEvaluationContext evaluationContext; - @Mock private TransformEvaluatorRegistry registry; - private Map<TransformExecutor<?>, Boolean> scheduled; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - - bundleFactory = InProcessBundleFactory.create(); - - scheduled = new HashMap<>(); - transformEvaluationState = - TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled); - - evaluatorCompleted = new CountDownLatch(1); - completionCallback = new RegisteringCompletionCallback(evaluatorCompleted); - - TestPipeline p = TestPipeline.create(); - created = p.apply(Create.of("foo", "spam", "third")); - downstream = created.apply(WithKeys.<Integer, String>of(3)); - } - - @Test - public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception { - final InProcessTransformResult result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); - final AtomicBoolean finishCalled = new AtomicBoolean(false); - TransformEvaluator<Object> evaluator = - new TransformEvaluator<Object>() { - @Override - public void processElement(WindowedValue<Object> element) throws Exception { - throw new IllegalArgumentException("Shouldn't be called"); - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - finishCalled.set(true); - return result; - } - }; - - when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext)) - .thenReturn(evaluator); - - TransformExecutor<Object> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, - null, - created.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - executor.call(); - - assertThat(finishCalled.get(), is(true)); - assertThat(completionCallback.handledResult, equalTo(result)); - assertThat(completionCallback.handledThrowable, is(nullValue())); - assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); - } - - @Test - public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception { - final InProcessTransformResult result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); - final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>(); - TransformEvaluator<String> evaluator = - new TransformEvaluator<String>() { - @Override - public void processElement(WindowedValue<String> element) throws Exception { - elementsProcessed.add(element); - return; - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - return result; - } - }; - - WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); - WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam"); - WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third"); - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); - when( - registry.<String>forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) - .thenReturn(evaluator); - - TransformExecutor<String> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, - inputBundle, - downstream.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - - Executors.newSingleThreadExecutor().submit(executor); - - evaluatorCompleted.await(); - - assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo)); - assertThat(completionCallback.handledResult, equalTo(result)); - assertThat(completionCallback.handledThrowable, is(nullValue())); - assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); - } - - @Test - public void processElementThrowsExceptionCallsback() throws Exception { - final InProcessTransformResult result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); - final Exception exception = new Exception(); - TransformEvaluator<String> evaluator = - new TransformEvaluator<String>() { - @Override - public void processElement(WindowedValue<String> element) throws Exception { - throw exception; - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - return result; - } - }; - - WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).add(foo).commit(Instant.now()); - when( - registry.<String>forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) - .thenReturn(evaluator); - - TransformExecutor<String> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, - inputBundle, - downstream.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - Executors.newSingleThreadExecutor().submit(executor); - - evaluatorCompleted.await(); - - assertThat(completionCallback.handledResult, is(nullValue())); - assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception)); - assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); - } - - @Test - public void finishBundleThrowsExceptionCallsback() throws Exception { - final Exception exception = new Exception(); - TransformEvaluator<String> evaluator = - new TransformEvaluator<String>() { - @Override - public void processElement(WindowedValue<String> element) throws Exception {} - - @Override - public InProcessTransformResult finishBundle() throws Exception { - throw exception; - } - }; - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).commit(Instant.now()); - when( - registry.<String>forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) - .thenReturn(evaluator); - - TransformExecutor<String> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, - inputBundle, - downstream.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - Executors.newSingleThreadExecutor().submit(executor); - - evaluatorCompleted.await(); - - assertThat(completionCallback.handledResult, is(nullValue())); - assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception)); - assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); - } - - @Test - public void duringCallGetThreadIsNonNull() throws Exception { - final InProcessTransformResult result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); - final CountDownLatch testLatch = new CountDownLatch(1); - final CountDownLatch evaluatorLatch = new CountDownLatch(1); - TransformEvaluator<Object> evaluator = - new TransformEvaluator<Object>() { - @Override - public void processElement(WindowedValue<Object> element) throws Exception { - throw new IllegalArgumentException("Shouldn't be called"); - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - testLatch.countDown(); - evaluatorLatch.await(); - return result; - } - }; - - when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext)) - .thenReturn(evaluator); - - TransformExecutor<String> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>emptyList(), - evaluationContext, - null, - created.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - - Executors.newSingleThreadExecutor().submit(executor); - testLatch.await(); - assertThat(executor.getThread(), not(nullValue())); - - // Finish the execution so everything can get closed down cleanly. - evaluatorLatch.countDown(); - } - - @Test - public void callWithEnforcementAppliesEnforcement() throws Exception { - final InProcessTransformResult result = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); - - TransformEvaluator<Object> evaluator = - new TransformEvaluator<Object>() { - @Override - public void processElement(WindowedValue<Object> element) throws Exception { - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - return result; - } - }; - - WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo"); - WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar"); - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now()); - when( - registry.forApplication( - downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) - .thenReturn(evaluator); - - TestEnforcementFactory enforcement = new TestEnforcementFactory(); - TransformExecutor<String> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>singleton(enforcement), - evaluationContext, - inputBundle, - downstream.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - - executor.call(); - TestEnforcement<?> testEnforcement = enforcement.instance; - assertThat( - testEnforcement.beforeElements, - Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem)); - assertThat( - testEnforcement.afterElements, - Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem)); - assertThat(testEnforcement.finishedBundles, contains(result)); - } - - @Test - public void callWithEnforcementThrowsOnFinishPropagates() throws Exception { - PCollection<byte[]> pcBytes = - created.apply( - new PTransform<PCollection<String>, PCollection<byte[]>>() { - @Override - public PCollection<byte[]> apply(PCollection<String> input) { - return PCollection.<byte[]>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(ByteArrayCoder.of()); - } - }); - - final InProcessTransformResult result = - StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); - final CountDownLatch testLatch = new CountDownLatch(1); - final CountDownLatch evaluatorLatch = new CountDownLatch(1); - - TransformEvaluator<Object> evaluator = - new TransformEvaluator<Object>() { - @Override - public void processElement(WindowedValue<Object> element) throws Exception {} - - @Override - public InProcessTransformResult finishBundle() throws Exception { - testLatch.countDown(); - evaluatorLatch.await(); - return result; - } - }; - - WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); - CommittedBundle<byte[]> inputBundle = - bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); - when( - registry.forApplication( - pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext)) - .thenReturn(evaluator); - - TransformExecutor<byte[]> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), - evaluationContext, - inputBundle, - pcBytes.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - - Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor); - testLatch.await(); - fooBytes.getValue()[0] = 'b'; - evaluatorLatch.countDown(); - - thrown.expectCause(isA(IllegalMutationException.class)); - task.get(); - } - - @Test - public void callWithEnforcementThrowsOnElementPropagates() throws Exception { - PCollection<byte[]> pcBytes = - created.apply( - new PTransform<PCollection<String>, PCollection<byte[]>>() { - @Override - public PCollection<byte[]> apply(PCollection<String> input) { - return PCollection.<byte[]>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(ByteArrayCoder.of()); - } - }); - - final InProcessTransformResult result = - StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); - final CountDownLatch testLatch = new CountDownLatch(1); - final CountDownLatch evaluatorLatch = new CountDownLatch(1); - - TransformEvaluator<Object> evaluator = - new TransformEvaluator<Object>() { - @Override - public void processElement(WindowedValue<Object> element) throws Exception { - testLatch.countDown(); - evaluatorLatch.await(); - } - - @Override - public InProcessTransformResult finishBundle() throws Exception { - return result; - } - }; - - WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); - CommittedBundle<byte[]> inputBundle = - bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); - when( - registry.forApplication( - pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext)) - .thenReturn(evaluator); - - TransformExecutor<byte[]> executor = - TransformExecutor.create( - registry, - Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), - evaluationContext, - inputBundle, - pcBytes.getProducingTransformInternal(), - completionCallback, - transformEvaluationState); - - Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor); - testLatch.await(); - fooBytes.getValue()[0] = 'b'; - evaluatorLatch.countDown(); - - thrown.expectCause(isA(IllegalMutationException.class)); - task.get(); - } - - private static class RegisteringCompletionCallback implements CompletionCallback { - private InProcessTransformResult handledResult = null; - private Throwable handledThrowable = null; - private final CountDownLatch onMethod; - - private RegisteringCompletionCallback(CountDownLatch onMethod) { - this.onMethod = onMethod; - } - - @Override - public CommittedResult handleResult( - CommittedBundle<?> inputBundle, InProcessTransformResult result) { - handledResult = result; - onMethod.countDown(); - return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList()); - } - - @Override - public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - handledThrowable = t; - onMethod.countDown(); - } - } - - private static class TestEnforcementFactory implements ModelEnforcementFactory { - private TestEnforcement<?> instance; - @Override - public <T> TestEnforcement<T> forBundle( - CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { - TestEnforcement<T> newEnforcement = new TestEnforcement<>(); - instance = newEnforcement; - return newEnforcement; - } - } - - private static class TestEnforcement<T> implements ModelEnforcement<T> { - private final List<WindowedValue<T>> beforeElements = new ArrayList<>(); - private final List<WindowedValue<T>> afterElements = new ArrayList<>(); - private final List<InProcessTransformResult> finishedBundles = new ArrayList<>(); - - @Override - public void beforeElement(WindowedValue<T> element) { - beforeElements.add(element); - } - - @Override - public void afterElement(WindowedValue<T> element) { - afterElements.add(element); - } - - @Override - public void afterFinish( - CommittedBundle<T> input, - InProcessTransformResult result, - Iterable<? extends CommittedBundle<?>> outputs) { - finishedBundles.add(result); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java deleted file mode 100644 index 82657c0..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.AtomicCoder; -import org.apache.beam.sdk.coders.BigEndianLongCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderException; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.UnboundedSource; -import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableList; - -import org.hamcrest.Matchers; -import org.joda.time.DateTime; -import org.joda.time.Instant; -import org.joda.time.ReadableInstant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.NoSuchElementException; - -import javax.annotation.Nullable; -/** - * Tests for {@link UnboundedReadEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class UnboundedReadEvaluatorFactoryTest { - private PCollection<Long> longs; - private TransformEvaluatorFactory factory; - private InProcessEvaluationContext context; - private UncommittedBundle<Long> output; - - private BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Before - public void setup() { - UnboundedSource<Long, ?> source = - CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); - TestPipeline p = TestPipeline.create(); - longs = p.apply(Read.from(source)); - - factory = new UnboundedReadEvaluatorFactory(); - context = mock(InProcessEvaluationContext.class); - output = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(output); - } - - @Test - public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); - assertThat( - output.commit(Instant.now()).getElements(), - containsInAnyOrder( - tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), - tgw(0L))); - } - - /** - * Demonstrate that multiple sequential creations will produce additional elements if the source - * can provide them. - */ - @Test - public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception { - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); - assertThat( - output.commit(Instant.now()).getElements(), - containsInAnyOrder( - tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), - tgw(0L))); - - UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); - when(context.createRootBundle(longs)).thenReturn(secondOutput); - TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - InProcessTransformResult secondResult = secondEvaluator.finishBundle(); - assertThat( - secondResult.getWatermarkHold(), - Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); - assertThat( - secondOutput.commit(Instant.now()).getElements(), - containsInAnyOrder(tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), tgw(17L), tgw(16L), - tgw(15L), tgw(13L), tgw(10L))); - } - - @Test - public void boundedSourceEvaluatorClosesReader() throws Exception { - TestUnboundedSource<Long> source = - new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L); - - TestPipeline p = TestPipeline.create(); - PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); - - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); - evaluator.finishBundle(); - CommittedBundle<Long> committed = output.commit(Instant.now()); - assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3)); - assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); - } - - @Test - public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { - TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of()); - - TestPipeline p = TestPipeline.create(); - PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - - UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); - when(context.createRootBundle(pcollection)).thenReturn(output); - - TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); - evaluator.finishBundle(); - CommittedBundle<Long> committed = output.commit(Instant.now()); - assertThat(committed.getElements(), emptyIterable()); - assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); - } - - // TODO: Once the source is split into multiple sources before evaluating, this test will have to - // be updated. - /** - * Demonstrate that only a single unfinished instance of TransformEvaluator can be created at a - * time, with other calls returning an empty evaluator. - */ - @Test - public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception { - UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); - - TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - - TransformEvaluator<?> secondEvaluator = - factory.forApplication(longs.getProducingTransformInternal(), null, context); - - InProcessTransformResult secondResult = secondEvaluator.finishBundle(); - InProcessTransformResult result = evaluator.finishBundle(); - - assertThat( - result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); - assertThat( - output.commit(Instant.now()).getElements(), - containsInAnyOrder( - tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), - tgw(0L))); - - assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); - assertThat(secondOutput.commit(Instant.now()).getElements(), emptyIterable()); - } - - /** - * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where - * the timestamp is the epoch offset by the value of the element. - */ - private static WindowedValue<Long> tgw(Long elem) { - return WindowedValue.timestampedValueInGlobalWindow(elem, new Instant(elem)); - } - - private static class LongToInstantFn implements SerializableFunction<Long, Instant> { - @Override - public Instant apply(Long input) { - return new Instant(input); - } - } - - private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> { - static int readerClosedCount; - private final Coder<T> coder; - private final List<T> elems; - - public TestUnboundedSource(Coder<T> coder, T... elems) { - readerClosedCount = 0; - this.coder = coder; - this.elems = Arrays.asList(elems); - } - - @Override - public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits( - int desiredNumSplits, PipelineOptions options) throws Exception { - return ImmutableList.of(this); - } - - @Override - public UnboundedSource.UnboundedReader<T> createReader( - PipelineOptions options, TestCheckpointMark checkpointMark) { - return new TestUnboundedReader(elems); - } - - @Override - @Nullable - public Coder<TestCheckpointMark> getCheckpointMarkCoder() { - return new TestCheckpointMark.Coder(); - } - - @Override - public void validate() {} - - @Override - public Coder<T> getDefaultOutputCoder() { - return coder; - } - - private class TestUnboundedReader extends UnboundedReader<T> { - private final List<T> elems; - private int index; - - public TestUnboundedReader(List<T> elems) { - this.elems = elems; - this.index = -1; - } - - @Override - public boolean start() throws IOException { - return advance(); - } - - @Override - public boolean advance() throws IOException { - if (index + 1 < elems.size()) { - index++; - return true; - } - return false; - } - - @Override - public Instant getWatermark() { - return Instant.now(); - } - - @Override - public CheckpointMark getCheckpointMark() { - return new TestCheckpointMark(); - } - - @Override - public UnboundedSource<T, ?> getCurrentSource() { - TestUnboundedSource<T> source = TestUnboundedSource.this; - return source; - } - - @Override - public T getCurrent() throws NoSuchElementException { - return elems.get(index); - } - - @Override - public Instant getCurrentTimestamp() throws NoSuchElementException { - return Instant.now(); - } - - @Override - public void close() throws IOException { - readerClosedCount++; - } - } - } - - private static class TestCheckpointMark implements CheckpointMark { - @Override - public void finalizeCheckpoint() throws IOException {} - - public static class Coder extends AtomicCoder<TestCheckpointMark> { - @Override - public void encode( - TestCheckpointMark value, - OutputStream outStream, - org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException {} - - @Override - public TestCheckpointMark decode( - InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) - throws CoderException, IOException { - return new TestCheckpointMark(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java deleted file mode 100644 index 05346dc..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link ViewEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class ViewEvaluatorFactoryTest { - private BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Test - public void testInMemoryEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bar")); - CreatePCollectionView<String, Iterable<String>> createView = - CreatePCollectionView.of( - PCollectionViews.iterableView(p, input.getWindowingStrategy(), StringUtf8Coder.of())); - PCollection<Iterable<String>> concat = - input.apply(WithKeys.<Void, String>of((Void) null)) - .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) - .apply(GroupByKey.<Void, String>create()) - .apply(Values.<Iterable<String>>create()); - PCollectionView<Iterable<String>> view = - concat.apply(new ViewEvaluatorFactory.WriteView<>(createView)); - - InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); - TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); - when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter); - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - TransformEvaluator<Iterable<String>> evaluator = - new ViewEvaluatorFactory() - .forApplication(view.getProducingTransformInternal(), inputBundle, context); - - evaluator.processElement( - WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar"))); - assertThat(viewWriter.latest, nullValue()); - - evaluator.finishBundle(); - assertThat( - viewWriter.latest, - containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), WindowedValue.valueInGlobalWindow("bar"))); - } - - private static class TestViewWriter<ElemT, ViewT> implements PCollectionViewWriter<ElemT, ViewT> { - private Iterable<WindowedValue<ElemT>> latest; - - @Override - public void add(Iterable<WindowedValue<ElemT>> values) { - latest = values; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java deleted file mode 100644 index 3b36bc5..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollection; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Tests for {@link WatermarkCallbackExecutor}. - */ -@RunWith(JUnit4.class) -public class WatermarkCallbackExecutorTest { - private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create(); - private AppliedPTransform<?, ?, ?> create; - private AppliedPTransform<?, ?, ?> sum; - - @Before - public void setup() { - TestPipeline p = TestPipeline.create(); - PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); - create = created.getProducingTransformInternal(); - sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal(); - } - - @Test - public void onGuaranteedFiringFiresAfterTrigger() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - executor.callOnGuaranteedFiring( - create, - GlobalWindow.INSTANCE, - WindowingStrategy.globalDefault(), - new CountDownLatchCallback(latch)); - - executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE); - assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); - } - - @Test - public void multipleCallbacksShouldFireFires() throws Exception { - CountDownLatch latch = new CountDownLatch(2); - WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10)); - IntervalWindow window = - new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); - executor.callOnGuaranteedFiring( - create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); - executor.callOnGuaranteedFiring( - create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); - - executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10))); - assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); - } - - @Test - public void noCallbacksShouldFire() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10)); - IntervalWindow window = - new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); - executor.callOnGuaranteedFiring( - create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); - - executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5))); - assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); - } - - @Test - public void unrelatedStepShouldNotFire() throws Exception { - CountDownLatch latch = new CountDownLatch(1); - WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10)); - IntervalWindow window = - new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); - executor.callOnGuaranteedFiring( - sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); - - executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20))); - assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); - } - - private static class CountDownLatchCallback implements Runnable { - private final CountDownLatch latch; - - public CountDownLatchCallback(CountDownLatch latch) { - this.latch = latch; - } - - @Override - public void run() { - latch.countDown(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java deleted file mode 100644 index d41825d..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.windowing.AfterPane; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.Window.Bound; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; - -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link WindowEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class WindowEvaluatorFactoryTest { - private static final Instant EPOCH = new Instant(0); - - private PCollection<Long> input; - private WindowEvaluatorFactory factory; - - @Mock private InProcessEvaluationContext evaluationContext; - - private BundleFactory bundleFactory; - - private WindowedValue<Long> first = - WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L)); - private WindowedValue<Long> second = - WindowedValue.timestampedValueInGlobalWindow( - Long.valueOf(1L), EPOCH.plus(Duration.standardDays(3))); - private WindowedValue<Long> third = - WindowedValue.of( - Long.valueOf(2L), - new Instant(-10L), - new IntervalWindow(new Instant(-100), EPOCH), - PaneInfo.NO_FIRING); - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - TestPipeline p = TestPipeline.create(); - input = p.apply(Create.of(1L, 2L, 3L)); - - bundleFactory = InProcessBundleFactory.create(); - factory = new WindowEvaluatorFactory(); - } - - @Test - public void nullWindowFunSucceeds() throws Exception { - Bound<Long> transform = - Window.<Long>triggering( - AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))) - .accumulatingFiredPanes(); - PCollection<Long> triggering = input.apply(transform); - - CommittedBundle<Long> inputBundle = createInputBundle(); - - UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle); - - InProcessTransformResult result = runEvaluator(triggering, inputBundle, transform); - - assertThat( - Iterables.getOnlyElement(result.getOutputBundles()), - Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); - CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); - assertThat(committed.getElements(), containsInAnyOrder(third, first, second)); - } - - @Test - public void singleWindowFnSucceeds() throws Exception { - Duration windowDuration = Duration.standardDays(7); - Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration)); - PCollection<Long> windowed = input.apply(transform); - - CommittedBundle<Long> inputBundle = createInputBundle(); - - UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle); - - BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration)); - BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH); - - InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform); - - assertThat( - Iterables.getOnlyElement(result.getOutputBundles()), - Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); - CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); - - WindowedValue<Long> expectedNewFirst = - WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING); - WindowedValue<Long> expectedNewSecond = - WindowedValue.of( - 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING); - WindowedValue<Long> expectedNewThird = - WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING); - assertThat( - committed.getElements(), - containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird)); - } - - @Test - public void multipleWindowsWindowFnSucceeds() throws Exception { - Duration windowDuration = Duration.standardDays(6); - Duration slidingBy = Duration.standardDays(3); - Bound<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy)); - PCollection<Long> windowed = input.apply(transform); - - CommittedBundle<Long> inputBundle = createInputBundle(); - UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle); - - InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform); - - assertThat( - Iterables.getOnlyElement(result.getOutputBundles()), - Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); - CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); - - BoundedWindow w1 = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration)); - BoundedWindow w2 = - new IntervalWindow(EPOCH.plus(slidingBy), EPOCH.plus(slidingBy).plus(windowDuration)); - BoundedWindow wMinus1 = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH); - BoundedWindow wMinusSlide = - new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy)); - - WindowedValue<Long> expectedFirst = - WindowedValue.of( - first.getValue(), - first.getTimestamp(), - ImmutableSet.of(w1, wMinusSlide), - PaneInfo.NO_FIRING); - WindowedValue<Long> expectedSecond = - WindowedValue.of( - second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING); - WindowedValue<Long> expectedThird = - WindowedValue.of( - third.getValue(), - third.getTimestamp(), - ImmutableSet.of(wMinus1, wMinusSlide), - PaneInfo.NO_FIRING); - - assertThat( - committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird)); - } - - private CommittedBundle<Long> createInputBundle() { - CommittedBundle<Long> inputBundle = - bundleFactory - .createRootBundle(input) - .add(first) - .add(second) - .add(third) - .commit(Instant.now()); - return inputBundle; - } - - private UncommittedBundle<Long> createOutputBundle( - PCollection<Long> output, CommittedBundle<Long> inputBundle) { - UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(inputBundle, output); - when(evaluationContext.createBundle(inputBundle, output)).thenReturn(outputBundle); - return outputBundle; - } - - private InProcessTransformResult runEvaluator( - PCollection<Long> windowed, - CommittedBundle<Long> inputBundle, - Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite */) - throws Exception { - TransformEvaluator<Long> evaluator = - factory.forApplication( - AppliedPTransform.of("Window", input, windowed, windowTransform), - inputBundle, - evaluationContext); - - evaluator.processElement(first); - evaluator.processElement(second); - evaluator.processElement(third); - InProcessTransformResult result = evaluator.finishBundle(); - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/pom.xml ---------------------------------------------------------------------- diff --git a/sdks/java/pom.xml b/sdks/java/pom.xml index a8972c2..54c841e 100644 --- a/sdks/java/pom.xml +++ b/sdks/java/pom.xml @@ -56,5 +56,4 @@ </modules> </profile> </profiles> - </project>
