http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java new file mode 100644 index 0000000..236ad17 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate; +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.common.CounterSet; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaces; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TupleTag; + +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.Serializable; + +/** + * Tests for {@link ParDoSingleEvaluatorFactory}. + */ +@RunWith(JUnit4.class) +public class ParDoSingleEvaluatorFactoryTest implements Serializable { + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Test + public void testParDoInMemoryTransformEvaluator() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + PCollection<Integer> collection = + input.apply( + ParDo.of( + new DoFn<String, Integer>() { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().length()); + } + })); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection); + when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + org.apache.beam.runners.direct.TransformEvaluator<String> evaluator = + new ParDoSingleEvaluatorFactory() + .forApplication( + collection.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getCounters(), equalTo(counters)); + + assertThat( + outputBundle.commit(Instant.now()).getElements(), + Matchers.<WindowedValue<Integer>>containsInAnyOrder( + WindowedValue.valueInGlobalWindow(3), + WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), + WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); + } + + @Test + public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {}; + PCollection<Integer> collection = + input.apply( + ParDo.of( + new DoFn<String, Integer>() { + @Override + public void processElement(ProcessContext c) { + c.sideOutput(sideOutputTag, c.element().length()); + } + })); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection); + when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, null, null, null); + when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoSingleEvaluatorFactory() + .forApplication( + collection.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle)); + assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); + assertThat(result.getCounters(), equalTo(counters)); + } + + @Test + public void finishBundleWithStatePutsStateInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = + StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp()); + final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); + final StateNamespace windowNs = + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); + ParDo.Bound<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals() + .stateInternals() + .state(StateNamespaces.global(), watermarkTag) + .add(new Instant(124443L - c.element().length())); + c.windowingInternals() + .stateInternals() + .state( + StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), + bagTag) + .add(c.element()); + } + }); + PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + org.apache.beam.runners.direct.TransformEvaluator<String> evaluator = + new ParDoSingleEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + evaluator.processElement( + WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); + evaluator.processElement( + WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L))); + assertThat(result.getState(), not(nullValue())); + assertThat( + result.getState().state(StateNamespaces.global(), watermarkTag).read(), + equalTo(new Instant(124438L))); + assertThat( + result.getState().state(windowNs, bagTag).read(), + containsInAnyOrder("foo", "bara", "bazam")); + } + + @Test + public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); + + final TimerData addedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME); + final TimerData deletedTimer = + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + + ParDo.Bound<String, KV<String, Integer>> pardo = + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals().stateInternals(); + c.windowingInternals() + .timerInternals() + .setTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME)); + c.windowingInternals() + .timerInternals() + .deleteTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0), + new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + } + }); + PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + + InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + + when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); + + InProcessExecutionContext executionContext = + new InProcessExecutionContext(null, "myKey", null, null); + when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) + .thenReturn(executionContext); + CounterSet counters = new CounterSet(); + when(evaluationContext.createCounterSet()).thenReturn(counters); + + TransformEvaluator<String> evaluator = + new ParDoSingleEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + + evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getTimerUpdate(), + equalTo( + TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build())); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java new file mode 100644 index 0000000..49c9061 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.theInstance; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.TextIOTest; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.File; + +/** + * Tests for {@link TextIOShardedWriteFactory}. + */ +@RunWith(JUnit4.class) +public class TextIOShardedWriteFactoryTest { + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + private TextIOShardedWriteFactory factory; + + @Before + public void setup() { + factory = new TextIOShardedWriteFactory(); + } + + @Test + public void originalWithoutShardingReturnsOriginal() throws Exception { + File file = tmp.newFile("foo"); + PTransform<PCollection<String>, PDone> original = + TextIO.Write.to(file.getAbsolutePath()).withoutSharding(); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, theInstance(original)); + } + + @Test + public void originalShardingNotSpecifiedReturnsOriginal() throws Exception { + File file = tmp.newFile("foo"); + PTransform<PCollection<String>, PDone> original = TextIO.Write.to(file.getAbsolutePath()); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, theInstance(original)); + } + + @Test + public void originalShardedToOneReturnsExplicitlySharded() throws Exception { + File file = tmp.newFile("foo"); + TextIO.Write.Bound<String> original = + TextIO.Write.to(file.getAbsolutePath()).withNumShards(1); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); + + TestPipeline p = TestPipeline.create(); + String[] elems = new String[] {"foo", "bar", "baz"}; + p.apply(Create.<String>of(elems)).apply(overridden); + + file.delete(); + + p.run(); + TextIOTest.assertOutputFiles( + elems, StringUtf8Coder.of(), 1, tmp, "foo", original.getShardNameTemplate()); + } + + @Test + public void originalShardedToManyReturnsExplicitlySharded() throws Exception { + File file = tmp.newFile("foo"); + TextIO.Write.Bound<String> original = TextIO.Write.to(file.getAbsolutePath()).withNumShards(3); + PTransform<PCollection<String>, PDone> overridden = factory.override(original); + + assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original))); + + TestPipeline p = TestPipeline.create(); + String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"}; + p.apply(Create.<String>of(elems)).apply(overridden); + + file.delete(); + p.run(); + TextIOTest.assertOutputFiles( + elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java new file mode 100644 index 0000000..7dd5830 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.any; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import com.google.common.util.concurrent.MoreExecutors; + +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +/** + * Tests for {@link TransformExecutorServices}. + */ +@RunWith(JUnit4.class) +public class TransformExecutorServicesTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private ExecutorService executorService; + private Map<TransformExecutor<?>, Boolean> scheduled; + + @Before + public void setup() { + executorService = MoreExecutors.newDirectExecutorService(); + scheduled = new ConcurrentHashMap<>(); + } + + @Test + public void parallelScheduleMultipleSchedulesBothImmediately() { + @SuppressWarnings("unchecked") + TransformExecutor<Object> first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor<Object> second = mock(TransformExecutor.class); + + TransformExecutorService parallel = + TransformExecutorServices.parallel(executorService, scheduled); + parallel.schedule(first); + parallel.schedule(second); + + verify(first).call(); + verify(second).call(); + assertThat( + scheduled, + Matchers.allOf( + Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true), + Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true))); + + parallel.complete(first); + assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)); + assertThat( + scheduled, + not( + Matchers.<TransformExecutor<?>, Boolean>hasEntry( + Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class)))); + parallel.complete(second); + assertThat(scheduled.isEmpty(), is(true)); + } + + @Test + public void serialScheduleTwoWaitsForFirstToComplete() { + @SuppressWarnings("unchecked") + TransformExecutor<Object> first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor<Object> second = mock(TransformExecutor.class); + + TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); + serial.schedule(first); + verify(first).call(); + + serial.schedule(second); + verify(second, never()).call(); + + assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true)); + assertThat( + scheduled, + not( + Matchers.<TransformExecutor<?>, Boolean>hasEntry( + Matchers.<TransformExecutor<?>>equalTo(second), any(Boolean.class)))); + + serial.complete(first); + verify(second).call(); + assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)); + assertThat( + scheduled, + not( + Matchers.<TransformExecutor<?>, Boolean>hasEntry( + Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class)))); + + serial.complete(second); + } + + @Test + public void serialCompleteNotExecutingTaskThrows() { + @SuppressWarnings("unchecked") + TransformExecutor<Object> first = mock(TransformExecutor.class); + @SuppressWarnings("unchecked") + TransformExecutor<Object> second = mock(TransformExecutor.class); + + TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled); + serial.schedule(first); + thrown.expect(IllegalStateException.class); + thrown.expectMessage("unexpected currently executing"); + + serial.complete(second); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java new file mode 100644 index 0000000..959e9d3 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.IllegalMutationException; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.util.concurrent.MoreExecutors; + +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Tests for {@link TransformExecutor}. + */ +@RunWith(JUnit4.class) +public class TransformExecutorTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private PCollection<String> created; + private PCollection<KV<Integer, String>> downstream; + + private CountDownLatch evaluatorCompleted; + + private RegisteringCompletionCallback completionCallback; + private TransformExecutorService transformEvaluationState; + private BundleFactory bundleFactory; + @Mock private InProcessEvaluationContext evaluationContext; + @Mock private TransformEvaluatorRegistry registry; + private Map<TransformExecutor<?>, Boolean> scheduled; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + + bundleFactory = InProcessBundleFactory.create(); + + scheduled = new HashMap<>(); + transformEvaluationState = + TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled); + + evaluatorCompleted = new CountDownLatch(1); + completionCallback = new RegisteringCompletionCallback(evaluatorCompleted); + + TestPipeline p = TestPipeline.create(); + created = p.apply(Create.of("foo", "spam", "third")); + downstream = created.apply(WithKeys.<Integer, String>of(3)); + } + + @Test + public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + final AtomicBoolean finishCalled = new AtomicBoolean(false); + TransformEvaluator<Object> evaluator = + new TransformEvaluator<Object>() { + @Override + public void processElement(WindowedValue<Object> element) throws Exception { + throw new IllegalArgumentException("Shouldn't be called"); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + finishCalled.set(true); + return result; + } + }; + + when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext)) + .thenReturn(evaluator); + + TransformExecutor<Object> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>emptyList(), + evaluationContext, + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + executor.call(); + + assertThat(finishCalled.get(), is(true)); + assertThat(completionCallback.handledResult, equalTo(result)); + assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); + } + + @Test + public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>(); + TransformEvaluator<String> evaluator = + new TransformEvaluator<String>() { + @Override + public void processElement(WindowedValue<String> element) throws Exception { + elementsProcessed.add(element); + return; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); + WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam"); + WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third"); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now()); + when( + registry.<String>forApplication( + downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + .thenReturn(evaluator); + + TransformExecutor<String> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>emptyList(), + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo)); + assertThat(completionCallback.handledResult, equalTo(result)); + assertThat(completionCallback.handledThrowable, is(nullValue())); + assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); + } + + @Test + public void processElementThrowsExceptionCallsback() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final Exception exception = new Exception(); + TransformEvaluator<String> evaluator = + new TransformEvaluator<String>() { + @Override + public void processElement(WindowedValue<String> element) throws Exception { + throw exception; + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo"); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(created).add(foo).commit(Instant.now()); + when( + registry.<String>forApplication( + downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + .thenReturn(evaluator); + + TransformExecutor<String> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>emptyList(), + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(completionCallback.handledResult, is(nullValue())); + assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception)); + assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); + } + + @Test + public void finishBundleThrowsExceptionCallsback() throws Exception { + final Exception exception = new Exception(); + TransformEvaluator<String> evaluator = + new TransformEvaluator<String>() { + @Override + public void processElement(WindowedValue<String> element) throws Exception {} + + @Override + public InProcessTransformResult finishBundle() throws Exception { + throw exception; + } + }; + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(created).commit(Instant.now()); + when( + registry.<String>forApplication( + downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + .thenReturn(evaluator); + + TransformExecutor<String> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>emptyList(), + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + Executors.newSingleThreadExecutor().submit(executor); + + evaluatorCompleted.await(); + + assertThat(completionCallback.handledResult, is(nullValue())); + assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception)); + assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor))); + } + + @Test + public void duringCallGetThreadIsNonNull() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + final CountDownLatch testLatch = new CountDownLatch(1); + final CountDownLatch evaluatorLatch = new CountDownLatch(1); + TransformEvaluator<Object> evaluator = + new TransformEvaluator<Object>() { + @Override + public void processElement(WindowedValue<Object> element) throws Exception { + throw new IllegalArgumentException("Shouldn't be called"); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + testLatch.countDown(); + evaluatorLatch.await(); + return result; + } + }; + + when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext)) + .thenReturn(evaluator); + + TransformExecutor<String> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>emptyList(), + evaluationContext, + null, + created.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Executors.newSingleThreadExecutor().submit(executor); + testLatch.await(); + assertThat(executor.getThread(), not(nullValue())); + + // Finish the execution so everything can get closed down cleanly. + evaluatorLatch.countDown(); + } + + @Test + public void callWithEnforcementAppliesEnforcement() throws Exception { + final InProcessTransformResult result = + StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build(); + + TransformEvaluator<Object> evaluator = + new TransformEvaluator<Object>() { + @Override + public void processElement(WindowedValue<Object> element) throws Exception { + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo"); + WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar"); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now()); + when( + registry.forApplication( + downstream.getProducingTransformInternal(), inputBundle, evaluationContext)) + .thenReturn(evaluator); + + TestEnforcementFactory enforcement = new TestEnforcementFactory(); + TransformExecutor<String> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>singleton(enforcement), + evaluationContext, + inputBundle, + downstream.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + executor.call(); + TestEnforcement<?> testEnforcement = enforcement.instance; + assertThat( + testEnforcement.beforeElements, + Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem)); + assertThat( + testEnforcement.afterElements, + Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem)); + assertThat(testEnforcement.finishedBundles, contains(result)); + } + + @Test + public void callWithEnforcementThrowsOnFinishPropagates() throws Exception { + PCollection<byte[]> pcBytes = + created.apply( + new PTransform<PCollection<String>, PCollection<byte[]>>() { + @Override + public PCollection<byte[]> apply(PCollection<String> input) { + return PCollection.<byte[]>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) + .setCoder(ByteArrayCoder.of()); + } + }); + + final InProcessTransformResult result = + StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); + final CountDownLatch testLatch = new CountDownLatch(1); + final CountDownLatch evaluatorLatch = new CountDownLatch(1); + + TransformEvaluator<Object> evaluator = + new TransformEvaluator<Object>() { + @Override + public void processElement(WindowedValue<Object> element) throws Exception {} + + @Override + public InProcessTransformResult finishBundle() throws Exception { + testLatch.countDown(); + evaluatorLatch.await(); + return result; + } + }; + + WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); + CommittedBundle<byte[]> inputBundle = + bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); + when( + registry.forApplication( + pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext)) + .thenReturn(evaluator); + + TransformExecutor<byte[]> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), + evaluationContext, + inputBundle, + pcBytes.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor); + testLatch.await(); + fooBytes.getValue()[0] = 'b'; + evaluatorLatch.countDown(); + + thrown.expectCause(isA(IllegalMutationException.class)); + task.get(); + } + + @Test + public void callWithEnforcementThrowsOnElementPropagates() throws Exception { + PCollection<byte[]> pcBytes = + created.apply( + new PTransform<PCollection<String>, PCollection<byte[]>>() { + @Override + public PCollection<byte[]> apply(PCollection<String> input) { + return PCollection.<byte[]>createPrimitiveOutputInternal( + input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) + .setCoder(ByteArrayCoder.of()); + } + }); + + final InProcessTransformResult result = + StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build(); + final CountDownLatch testLatch = new CountDownLatch(1); + final CountDownLatch evaluatorLatch = new CountDownLatch(1); + + TransformEvaluator<Object> evaluator = + new TransformEvaluator<Object>() { + @Override + public void processElement(WindowedValue<Object> element) throws Exception { + testLatch.countDown(); + evaluatorLatch.await(); + } + + @Override + public InProcessTransformResult finishBundle() throws Exception { + return result; + } + }; + + WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes()); + CommittedBundle<byte[]> inputBundle = + bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now()); + when( + registry.forApplication( + pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext)) + .thenReturn(evaluator); + + TransformExecutor<byte[]> executor = + TransformExecutor.create( + registry, + Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()), + evaluationContext, + inputBundle, + pcBytes.getProducingTransformInternal(), + completionCallback, + transformEvaluationState); + + Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor); + testLatch.await(); + fooBytes.getValue()[0] = 'b'; + evaluatorLatch.countDown(); + + thrown.expectCause(isA(IllegalMutationException.class)); + task.get(); + } + + private static class RegisteringCompletionCallback implements CompletionCallback { + private InProcessTransformResult handledResult = null; + private Throwable handledThrowable = null; + private final CountDownLatch onMethod; + + private RegisteringCompletionCallback(CountDownLatch onMethod) { + this.onMethod = onMethod; + } + + @Override + public CommittedResult handleResult( + CommittedBundle<?> inputBundle, InProcessTransformResult result) { + handledResult = result; + onMethod.countDown(); + return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList()); + } + + @Override + public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { + handledThrowable = t; + onMethod.countDown(); + } + } + + private static class TestEnforcementFactory implements ModelEnforcementFactory { + private TestEnforcement<?> instance; + @Override + public <T> TestEnforcement<T> forBundle( + CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) { + TestEnforcement<T> newEnforcement = new TestEnforcement<>(); + instance = newEnforcement; + return newEnforcement; + } + } + + private static class TestEnforcement<T> implements ModelEnforcement<T> { + private final List<WindowedValue<T>> beforeElements = new ArrayList<>(); + private final List<WindowedValue<T>> afterElements = new ArrayList<>(); + private final List<InProcessTransformResult> finishedBundles = new ArrayList<>(); + + @Override + public void beforeElement(WindowedValue<T> element) { + beforeElements.add(element); + } + + @Override + public void afterElement(WindowedValue<T> element) { + afterElements.add(element); + } + + @Override + public void afterFinish( + CommittedBundle<T> input, + InProcessTransformResult result, + Iterable<? extends CommittedBundle<?>> outputs) { + finishedBundles.add(result); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java new file mode 100644 index 0000000..9f3909e --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matchers; +import org.joda.time.DateTime; +import org.joda.time.Instant; +import org.joda.time.ReadableInstant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; +/** + * Tests for {@link UnboundedReadEvaluatorFactory}. + */ +@RunWith(JUnit4.class) +public class UnboundedReadEvaluatorFactoryTest { + private PCollection<Long> longs; + private TransformEvaluatorFactory factory; + private InProcessEvaluationContext context; + private UncommittedBundle<Long> output; + + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Before + public void setup() { + UnboundedSource<Long, ?> source = + CountingSource.unboundedWithTimestampFn(new LongToInstantFn()); + TestPipeline p = TestPipeline.create(); + longs = p.apply(Read.from(source)); + + factory = new UnboundedReadEvaluatorFactory(); + context = mock(InProcessEvaluationContext.class); + output = bundleFactory.createRootBundle(longs); + when(context.createRootBundle(longs)).thenReturn(output); + } + + @Test + public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { + TransformEvaluator<?> evaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); + assertThat( + output.commit(Instant.now()).getElements(), + containsInAnyOrder( + tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), + tgw(0L))); + } + + /** + * Demonstrate that multiple sequential creations will produce additional elements if the source + * can provide them. + */ + @Test + public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception { + TransformEvaluator<?> evaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + + InProcessTransformResult result = evaluator.finishBundle(); + assertThat( + result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); + assertThat( + output.commit(Instant.now()).getElements(), + containsInAnyOrder( + tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), + tgw(0L))); + + UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); + when(context.createRootBundle(longs)).thenReturn(secondOutput); + TransformEvaluator<?> secondEvaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + InProcessTransformResult secondResult = secondEvaluator.finishBundle(); + assertThat( + secondResult.getWatermarkHold(), + Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); + assertThat( + secondOutput.commit(Instant.now()).getElements(), + containsInAnyOrder(tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), tgw(17L), tgw(16L), + tgw(15L), tgw(13L), tgw(10L))); + } + + @Test + public void boundedSourceEvaluatorClosesReader() throws Exception { + TestUnboundedSource<Long> source = + new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L); + + TestPipeline p = TestPipeline.create(); + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + + UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); + when(context.createRootBundle(pcollection)).thenReturn(output); + + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + evaluator.finishBundle(); + CommittedBundle<Long> committed = output.commit(Instant.now()); + assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3)); + assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); + } + + @Test + public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception { + TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of()); + + TestPipeline p = TestPipeline.create(); + PCollection<Long> pcollection = p.apply(Read.from(source)); + AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + + UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); + when(context.createRootBundle(pcollection)).thenReturn(output); + + TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); + evaluator.finishBundle(); + CommittedBundle<Long> committed = output.commit(Instant.now()); + assertThat(committed.getElements(), emptyIterable()); + assertThat(TestUnboundedSource.readerClosedCount, equalTo(1)); + } + + // TODO: Once the source is split into multiple sources before evaluating, this test will have to + // be updated. + /** + * Demonstrate that only a single unfinished instance of TransformEvaluator can be created at a + * time, with other calls returning an empty evaluator. + */ + @Test + public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception { + UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); + + TransformEvaluator<?> evaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + + TransformEvaluator<?> secondEvaluator = + factory.forApplication(longs.getProducingTransformInternal(), null, context); + + InProcessTransformResult secondResult = secondEvaluator.finishBundle(); + InProcessTransformResult result = evaluator.finishBundle(); + + assertThat( + result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant())); + assertThat( + output.commit(Instant.now()).getElements(), + containsInAnyOrder( + tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L), + tgw(0L))); + + assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE)); + assertThat(secondOutput.commit(Instant.now()).getElements(), emptyIterable()); + } + + /** + * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where + * the timestamp is the epoch offset by the value of the element. + */ + private static WindowedValue<Long> tgw(Long elem) { + return WindowedValue.timestampedValueInGlobalWindow(elem, new Instant(elem)); + } + + private static class LongToInstantFn implements SerializableFunction<Long, Instant> { + @Override + public Instant apply(Long input) { + return new Instant(input); + } + } + + private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> { + static int readerClosedCount; + private final Coder<T> coder; + private final List<T> elems; + + public TestUnboundedSource(Coder<T> coder, T... elems) { + readerClosedCount = 0; + this.coder = coder; + this.elems = Arrays.asList(elems); + } + + @Override + public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits( + int desiredNumSplits, PipelineOptions options) throws Exception { + return ImmutableList.of(this); + } + + @Override + public UnboundedSource.UnboundedReader<T> createReader( + PipelineOptions options, TestCheckpointMark checkpointMark) { + return new TestUnboundedReader(elems); + } + + @Override + @Nullable + public Coder<TestCheckpointMark> getCheckpointMarkCoder() { + return new TestCheckpointMark.Coder(); + } + + @Override + public void validate() {} + + @Override + public Coder<T> getDefaultOutputCoder() { + return coder; + } + + private class TestUnboundedReader extends UnboundedReader<T> { + private final List<T> elems; + private int index; + + public TestUnboundedReader(List<T> elems) { + this.elems = elems; + this.index = -1; + } + + @Override + public boolean start() throws IOException { + return advance(); + } + + @Override + public boolean advance() throws IOException { + if (index + 1 < elems.size()) { + index++; + return true; + } + return false; + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + + @Override + public CheckpointMark getCheckpointMark() { + return new TestCheckpointMark(); + } + + @Override + public UnboundedSource<T, ?> getCurrentSource() { + TestUnboundedSource<T> source = TestUnboundedSource.this; + return source; + } + + @Override + public T getCurrent() throws NoSuchElementException { + return elems.get(index); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public void close() throws IOException { + readerClosedCount++; + } + } + } + + private static class TestCheckpointMark implements CheckpointMark { + @Override + public void finalizeCheckpoint() throws IOException {} + + public static class Coder extends AtomicCoder<TestCheckpointMark> { + @Override + public void encode( + TestCheckpointMark value, + OutputStream outStream, + org.apache.beam.sdk.coders.Coder.Context context) + throws CoderException, IOException {} + + @Override + public TestCheckpointMark decode( + InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) + throws CoderException, IOException { + return new TestCheckpointMark(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java new file mode 100644 index 0000000..859418b --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.util.PCollectionViews; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ViewEvaluatorFactory}. + */ +@RunWith(JUnit4.class) +public class ViewEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + + @Test + public void testInMemoryEvaluator() throws Exception { + TestPipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of("foo", "bar")); + CreatePCollectionView<String, Iterable<String>> createView = + CreatePCollectionView.of( + PCollectionViews.iterableView(p, input.getWindowingStrategy(), StringUtf8Coder.of())); + PCollection<Iterable<String>> concat = + input.apply(WithKeys.<Void, String>of((Void) null)) + .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) + .apply(GroupByKey.<Void, String>create()) + .apply(Values.<Iterable<String>>create()); + PCollectionView<Iterable<String>> view = + concat.apply(new ViewEvaluatorFactory.WriteView<>(createView)); + + InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); + TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); + when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter); + + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); + TransformEvaluator<Iterable<String>> evaluator = + new ViewEvaluatorFactory() + .forApplication(view.getProducingTransformInternal(), inputBundle, context); + + evaluator.processElement( + WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar"))); + assertThat(viewWriter.latest, nullValue()); + + evaluator.finishBundle(); + assertThat( + viewWriter.latest, + containsInAnyOrder( + WindowedValue.valueInGlobalWindow("foo"), WindowedValue.valueInGlobalWindow("bar"))); + } + + private static class TestViewWriter<ElemT, ViewT> implements PCollectionViewWriter<ElemT, ViewT> { + private Iterable<WindowedValue<ElemT>> latest; + + @Override + public void add(Iterable<WindowedValue<ElemT>> values) { + latest = values; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java new file mode 100644 index 0000000..d47cf5e --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollection; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +/** + * Tests for {@link WatermarkCallbackExecutor}. + */ +@RunWith(JUnit4.class) +public class WatermarkCallbackExecutorTest { + private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create(); + private AppliedPTransform<?, ?, ?> create; + private AppliedPTransform<?, ?, ?> sum; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); + create = created.getProducingTransformInternal(); + sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal(); + } + + @Test + public void onGuaranteedFiringFiresAfterTrigger() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + executor.callOnGuaranteedFiring( + create, + GlobalWindow.INSTANCE, + WindowingStrategy.globalDefault(), + new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void multipleCallbacksShouldFireFires() throws Exception { + CountDownLatch latch = new CountDownLatch(2); + WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + } + + @Test + public void noCallbacksShouldFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + @Test + public void unrelatedStepShouldNotFire() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10)); + IntervalWindow window = + new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10))); + executor.callOnGuaranteedFiring( + sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch)); + + executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20))); + assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false)); + } + + private static class CountDownLatchCallback implements Runnable { + private final CountDownLatch latch; + + public CountDownLatchCallback(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + latch.countDown(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java new file mode 100644 index 0000000..64eb8ea --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle; +import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.Window.Bound; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; + +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link WindowEvaluatorFactory}. + */ +@RunWith(JUnit4.class) +public class WindowEvaluatorFactoryTest { + private static final Instant EPOCH = new Instant(0); + + private PCollection<Long> input; + private WindowEvaluatorFactory factory; + + @Mock private InProcessEvaluationContext evaluationContext; + + private BundleFactory bundleFactory; + + private WindowedValue<Long> first = + WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L)); + private WindowedValue<Long> second = + WindowedValue.timestampedValueInGlobalWindow( + Long.valueOf(1L), EPOCH.plus(Duration.standardDays(3))); + private WindowedValue<Long> third = + WindowedValue.of( + Long.valueOf(2L), + new Instant(-10L), + new IntervalWindow(new Instant(-100), EPOCH), + PaneInfo.NO_FIRING); + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + TestPipeline p = TestPipeline.create(); + input = p.apply(Create.of(1L, 2L, 3L)); + + bundleFactory = InProcessBundleFactory.create(); + factory = new WindowEvaluatorFactory(); + } + + @Test + public void nullWindowFunSucceeds() throws Exception { + Bound<Long> transform = + Window.<Long>triggering( + AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))) + .accumulatingFiredPanes(); + PCollection<Long> triggering = input.apply(transform); + + CommittedBundle<Long> inputBundle = createInputBundle(); + + UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle); + + InProcessTransformResult result = runEvaluator(triggering, inputBundle, transform); + + assertThat( + Iterables.getOnlyElement(result.getOutputBundles()), + Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); + CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(third, first, second)); + } + + @Test + public void singleWindowFnSucceeds() throws Exception { + Duration windowDuration = Duration.standardDays(7); + Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration)); + PCollection<Long> windowed = input.apply(transform); + + CommittedBundle<Long> inputBundle = createInputBundle(); + + UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle); + + BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration)); + BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH); + + InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform); + + assertThat( + Iterables.getOnlyElement(result.getOutputBundles()), + Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); + CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); + + WindowedValue<Long> expectedNewFirst = + WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING); + WindowedValue<Long> expectedNewSecond = + WindowedValue.of( + 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING); + WindowedValue<Long> expectedNewThird = + WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING); + assertThat( + committed.getElements(), + containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird)); + } + + @Test + public void multipleWindowsWindowFnSucceeds() throws Exception { + Duration windowDuration = Duration.standardDays(6); + Duration slidingBy = Duration.standardDays(3); + Bound<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy)); + PCollection<Long> windowed = input.apply(transform); + + CommittedBundle<Long> inputBundle = createInputBundle(); + UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle); + + InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform); + + assertThat( + Iterables.getOnlyElement(result.getOutputBundles()), + Matchers.<UncommittedBundle<?>>equalTo(outputBundle)); + CommittedBundle<Long> committed = outputBundle.commit(Instant.now()); + + BoundedWindow w1 = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration)); + BoundedWindow w2 = + new IntervalWindow(EPOCH.plus(slidingBy), EPOCH.plus(slidingBy).plus(windowDuration)); + BoundedWindow wMinus1 = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH); + BoundedWindow wMinusSlide = + new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy)); + + WindowedValue<Long> expectedFirst = + WindowedValue.of( + first.getValue(), + first.getTimestamp(), + ImmutableSet.of(w1, wMinusSlide), + PaneInfo.NO_FIRING); + WindowedValue<Long> expectedSecond = + WindowedValue.of( + second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING); + WindowedValue<Long> expectedThird = + WindowedValue.of( + third.getValue(), + third.getTimestamp(), + ImmutableSet.of(wMinus1, wMinusSlide), + PaneInfo.NO_FIRING); + + assertThat( + committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird)); + } + + private CommittedBundle<Long> createInputBundle() { + CommittedBundle<Long> inputBundle = + bundleFactory + .createRootBundle(input) + .add(first) + .add(second) + .add(third) + .commit(Instant.now()); + return inputBundle; + } + + private UncommittedBundle<Long> createOutputBundle( + PCollection<Long> output, CommittedBundle<Long> inputBundle) { + UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(inputBundle, output); + when(evaluationContext.createBundle(inputBundle, output)).thenReturn(outputBundle); + return outputBundle; + } + + private InProcessTransformResult runEvaluator( + PCollection<Long> windowed, + CommittedBundle<Long> inputBundle, + Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite */) + throws Exception { + TransformEvaluator<Long> evaluator = + factory.forApplication( + AppliedPTransform.of("Window", input, windowed, windowTransform), + inputBundle, + evaluationContext); + + evaluator.processElement(first); + evaluator.processElement(second); + evaluator.processElement(third); + InProcessTransformResult result = evaluator.finishBundle(); + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/pom.xml ---------------------------------------------------------------------- diff --git a/runners/pom.xml b/runners/pom.xml index 7b2f356..74812e8 100644 --- a/runners/pom.xml +++ b/runners/pom.xml @@ -35,6 +35,7 @@ <name>Apache Beam :: Runners</name> <modules> + <module>direct-java</module> <module>flink</module> <module>spark</module> </modules> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java deleted file mode 100644 index fe9c165..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.util.WindowedValue; - -/** - * An abstract {@link ModelEnforcement} that provides default empty implementations for each method. - */ -abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> { - @Override - public void beforeElement(WindowedValue<T> element) {} - - @Override - public void afterElement(WindowedValue<T> element) {} - - @Override - public void afterFinish( - CommittedBundle<T> input, - InProcessTransformResult result, - Iterable<? extends CommittedBundle<?>> outputs) {} -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java deleted file mode 100644 index e19ffe4..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.io.AvroIO; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.IOChannelUtils; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; - -class AvroIOShardedWriteFactory implements PTransformOverrideFactory { - @Override - public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override( - PTransform<InputT, OutputT> transform) { - if (transform instanceof AvroIO.Write.Bound) { - @SuppressWarnings("unchecked") - AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) transform; - if (originalWrite.getNumShards() > 1 - || (originalWrite.getNumShards() == 1 - && !"".equals(originalWrite.getShardNameTemplate()))) { - @SuppressWarnings("unchecked") - PTransform<InputT, OutputT> override = - (PTransform<InputT, OutputT>) new AvroIOShardedWrite<InputT>(originalWrite); - return override; - } - } - return transform; - } - - private class AvroIOShardedWrite<InputT> extends ShardControlledWrite<InputT> { - private final AvroIO.Write.Bound<InputT> initial; - - private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) { - this.initial = initial; - } - - @Override - int getNumShards() { - return initial.getNumShards(); - } - - @Override - PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) { - String shardName = - IOChannelUtils.constructName( - initial.getFilenamePrefix(), - initial.getShardNameTemplate(), - initial.getFilenameSuffix(), - shardNum, - getNumShards()); - return initial.withoutSharding().to(shardName).withSuffix(""); - } - - @Override - protected PTransform<PCollection<InputT>, PDone> delegate() { - return initial; - } - } -}
