http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java deleted file mode 100644 index d1ea51a..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ /dev/null @@ -1,526 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.common.Counter; -import org.apache.beam.sdk.util.common.Counter.AggregationKind; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Collection; -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Tests for {@link InProcessEvaluationContext}. - */ -@RunWith(JUnit4.class) -public class InProcessEvaluationContextTest { - private TestPipeline p; - private InProcessEvaluationContext context; - - private PCollection<Integer> created; - private PCollection<KV<String, Integer>> downstream; - private PCollectionView<Iterable<Integer>> view; - private PCollection<Long> unbounded; - private Collection<AppliedPTransform<?, ?, ?>> rootTransforms; - private Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers; - - private BundleFactory bundleFactory; - - @Before - public void setup() { - InProcessPipelineRunner runner = - InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create()); - - p = TestPipeline.create(); - - created = p.apply(Create.of(1, 2, 3)); - downstream = created.apply(WithKeys.<String, Integer>of("foo")); - view = created.apply(View.<Integer>asIterable()); - unbounded = p.apply(CountingInput.unbounded()); - - ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor(); - p.traverseTopologically(cVis); - rootTransforms = cVis.getRootTransforms(); - valueToConsumers = cVis.getValueToConsumers(); - - bundleFactory = InProcessBundleFactory.create(); - - context = - InProcessEvaluationContext.create( - runner.getPipelineOptions(), - InProcessBundleFactory.create(), - rootTransforms, - valueToConsumers, - cVis.getStepNames(), - cVis.getViews()); - } - - @Test - public void writeToViewWriterThenReadReads() { - PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter = - context.createPCollectionViewWriter( - PCollection.<Iterable<Integer>>createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED), - view); - BoundedWindow window = new TestBoundedWindow(new Instant(1024L)); - BoundedWindow second = new TestBoundedWindow(new Instant(899999L)); - WindowedValue<Integer> firstValue = - WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<Integer> secondValue = - WindowedValue.of( - 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)); - Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue); - viewWriter.add(values); - - SideInputReader reader = - context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view)); - assertThat(reader.get(view, window), containsInAnyOrder(1)); - assertThat(reader.get(view, second), containsInAnyOrder(2)); - - WindowedValue<Integer> overrittenSecondValue = - WindowedValue.of( - 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); - viewWriter.add(Collections.singleton(overrittenSecondValue)); - assertThat(reader.get(view, second), containsInAnyOrder(4444)); - } - - @Test - public void getExecutionContextSameStepSameKeyState() { - InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), "foo"); - - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1"); - stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); - - context.handleResult( - InProcessBundleFactory.create() - .createKeyedBundle(null, "foo", created) - .commit(Instant.now()), - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) - .withState(stepContext.commitState()) - .build()); - - InProcessExecutionContext secondFooContext = - context.getExecutionContext(created.getProducingTransformInternal(), "foo"); - assertThat( - secondFooContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .read(), - contains(1)); - } - - - @Test - public void getExecutionContextDifferentKeysIndependentState() { - InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), "foo"); - - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - fooContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .add(1); - - InProcessExecutionContext barContext = - context.getExecutionContext(created.getProducingTransformInternal(), "bar"); - assertThat(barContext, not(equalTo(fooContext))); - assertThat( - barContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .read(), - emptyIterable()); - } - - @Test - public void getExecutionContextDifferentStepsIndependentState() { - String myKey = "foo"; - InProcessExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), myKey); - - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - fooContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .add(1); - - InProcessExecutionContext barContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); - assertThat( - barContext - .getOrCreateStepContext("s1", "s1") - .stateInternals() - .state(StateNamespaces.global(), intBag) - .read(), - emptyIterable()); - } - - @Test - public void handleResultMergesCounters() { - CounterSet counters = context.createCounterSet(); - Counter<Long> myCounter = Counter.longs("foo", AggregationKind.SUM); - counters.addCounter(myCounter); - - myCounter.addValue(4L); - InProcessTransformResult result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()) - .withCounters(counters) - .build(); - context.handleResult(null, ImmutableList.<TimerData>of(), result); - assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(4L)); - - CounterSet againCounters = context.createCounterSet(); - Counter<Long> myLongCounterAgain = Counter.longs("foo", AggregationKind.SUM); - againCounters.add(myLongCounterAgain); - myLongCounterAgain.addValue(8L); - - InProcessTransformResult secondResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) - .withCounters(againCounters) - .build(); - context.handleResult( - context.createRootBundle(created).commit(Instant.now()), - ImmutableList.<TimerData>of(), - secondResult); - assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); - } - - @Test - public void handleResultStoresState() { - String myKey = "foo"; - InProcessExecutionContext fooContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); - - StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); - - CopyOnAccessInMemoryStateInternals<Object> state = - fooContext.getOrCreateStepContext("s1", "s1").stateInternals(); - BagState<Integer> bag = state.state(StateNamespaces.global(), intBag); - bag.add(1); - bag.add(2); - bag.add(4); - - InProcessTransformResult stateResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) - .withState(state) - .build(); - - context.handleResult( - context.createKeyedBundle(null, myKey, created).commit(Instant.now()), - ImmutableList.<TimerData>of(), - stateResult); - - InProcessExecutionContext afterResultContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); - - CopyOnAccessInMemoryStateInternals<Object> afterResultState = - afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); - assertThat(afterResultState.state(StateNamespaces.global(), intBag).read(), contains(1, 2, 4)); - } - - @Test - public void callAfterOutputMustHaveBeenProducedAfterEndOfWatermarkCallsback() throws Exception { - final CountDownLatch callLatch = new CountDownLatch(1); - Runnable callback = - new Runnable() { - @Override - public void run() { - callLatch.countDown(); - } - }; - - // Should call back after the end of the global window - context.scheduleAfterOutputWouldBeProduced( - downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); - - InProcessTransformResult result = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) - .build(); - - context.handleResult(null, ImmutableList.<TimerData>of(), result); - - // Difficult to demonstrate that we took no action in a multithreaded world; poll for a bit - // will likely be flaky if this logic is broken - assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); - - InProcessTransformResult finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); - context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); - // Obtain the value via blocking call - assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); - } - - @Test - public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { - InProcessTransformResult finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); - context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); - - final CountDownLatch callLatch = new CountDownLatch(1); - Runnable callback = - new Runnable() { - @Override - public void run() { - callLatch.countDown(); - } - }; - context.scheduleAfterOutputWouldBeProduced( - downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); - assertThat(callLatch.await(1, TimeUnit.SECONDS), is(true)); - } - - @Test - public void extractFiredTimersExtractsTimers() { - InProcessTransformResult holdResult = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) - .build(); - context.handleResult(null, ImmutableList.<TimerData>of(), holdResult); - - String key = "foo"; - TimerData toFire = - TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); - InProcessTransformResult timerResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) - .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) - .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) - .build(); - - // haven't added any timers, must be empty - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); - context.handleResult( - context.createKeyedBundle(null, key, created).commit(Instant.now()), - ImmutableList.<TimerData>of(), - timerResult); - - // timer hasn't fired - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); - - InProcessTransformResult advanceResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); - // Should cause the downstream timer to fire - context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult); - - Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers(); - assertThat( - fired, - Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal())); - Map<Object, FiredTimers> downstreamFired = - fired.get(downstream.getProducingTransformInternal()); - assertThat(downstreamFired, Matchers.<Object>hasKey(key)); - - FiredTimers firedForKey = downstreamFired.get(key); - assertThat(firedForKey.getTimers(TimeDomain.PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME), emptyIterable()); - assertThat(firedForKey.getTimers(TimeDomain.EVENT_TIME), contains(toFire)); - - // Don't reextract timers - assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); - } - - @Test - public void createBundleKeyedResultPropagatesKey() { - CommittedBundle<KV<String, Integer>> newBundle = - context - .createBundle( - bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), - downstream) - .commit(Instant.now()); - assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo")); - } - - @Test - public void createKeyedBundleKeyed() { - CommittedBundle<KV<String, Integer>> keyedBundle = - context - .createKeyedBundle( - bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) - .commit(Instant.now()); - assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo")); - } - - @Test - public void isDoneWithUnboundedPCollectionAndShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); - - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); - } - - @Test - public void isDoneWithUnboundedPCollectionAndNotShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); - - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); - } - - @Test - public void isDoneWithOnlyBoundedPCollections() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(created.getProducingTransformInternal()), is(false)); - - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); - assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); - } - - @Test - public void isDoneWithPartiallyDone() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); - assertThat(context.isDone(), is(false)); - - UncommittedBundle<Integer> rootBundle = context.createRootBundle(created); - rootBundle.add(WindowedValue.valueInGlobalWindow(1)); - CommittedResult handleResult = - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) - .addOutput(rootBundle) - .build()); - @SuppressWarnings("unchecked") - CommittedBundle<Integer> committedBundle = - (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult.getOutputs()); - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - assertThat(context.isDone(), is(false)); - - for (AppliedPTransform<?, ?, ?> consumers : valueToConsumers.get(created)) { - context.handleResult( - committedBundle, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(consumers).build()); - } - assertThat(context.isDone(), is(true)); - } - - @Test - public void isDoneWithUnboundedAndNotShutdown() { - context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(), is(false)); - - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); - context.handleResult( - null, - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - context.handleResult( - context.createRootBundle(created).commit(Instant.now()), - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); - assertThat(context.isDone(), is(false)); - - context.handleResult( - context.createRootBundle(created).commit(Instant.now()), - ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); - assertThat(context.isDone(), is(false)); - } - - private static class TestBoundedWindow extends BoundedWindow { - private final Instant ts; - - public TestBoundedWindow(Instant ts) { - this.ts = ts; - } - - @Override - public Instant maxTimestamp() { - return ts; - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java deleted file mode 100644 index 59a96ed..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRegistrarTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import org.apache.beam.sdk.options.PipelineOptionsRegistrar; -import org.apache.beam.sdk.runners.PipelineRunnerRegistrar; -import org.apache.beam.sdk.runners.inprocess.InProcessRegistrar.InProcessRunner; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.ServiceLoader; - -/** Tests for {@link InProcessRunner}. */ -@RunWith(JUnit4.class) -public class InProcessPipelineRegistrarTest { - @Test - public void testCorrectOptionsAreReturned() { - assertEquals( - ImmutableList.of(InProcessPipelineOptions.class), - new InProcessRegistrar.InProcessOptions().getPipelineOptions()); - } - - @Test - public void testCorrectRunnersAreReturned() { - assertEquals( - ImmutableList.of(InProcessPipelineRunner.class), - new InProcessRegistrar.InProcessRunner().getPipelineRunners()); - } - - @Test - public void testServiceLoaderForOptions() { - for (PipelineOptionsRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineOptionsRegistrar.class).iterator())) { - if (registrar instanceof InProcessRegistrar.InProcessOptions) { - return; - } - } - fail("Expected to find " + InProcessRegistrar.InProcessOptions.class); - } - - @Test - public void testServiceLoaderForRunner() { - for (PipelineRunnerRegistrar registrar : - Lists.newArrayList(ServiceLoader.load(PipelineRunnerRegistrar.class).iterator())) { - if (registrar instanceof InProcessRegistrar.InProcessRunner) { - return; - } - } - fail("Expected to find " + InProcessRegistrar.InProcessRunner.class); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java deleted file mode 100644 index e9e9e36..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessPipelineRunnerTest.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.InProcessPipelineResult; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Tests for basic {@link InProcessPipelineRunner} functionality. - */ -@RunWith(JUnit4.class) -public class InProcessPipelineRunnerTest implements Serializable { - @Test - public void wordCountShouldSucceed() throws Throwable { - Pipeline p = getPipeline(); - - PCollection<KV<String, Long>> counts = - p.apply(Create.of("foo", "bar", "foo", "baz", "bar", "foo")) - .apply(MapElements.via(new SimpleFunction<String, String>() { - @Override - public String apply(String input) { - return input; - } - })) - .apply(Count.<String>perElement()); - PCollection<String> countStrs = - counts.apply(MapElements.via(new SimpleFunction<KV<String, Long>, String>() { - @Override - public String apply(KV<String, Long> input) { - String str = String.format("%s: %s", input.getKey(), input.getValue()); - return str; - } - })); - - PAssert.that(countStrs).containsInAnyOrder("baz: 1", "bar: 2", "foo: 3"); - - InProcessPipelineResult result = ((InProcessPipelineResult) p.run()); - result.awaitCompletion(); - } - - private Pipeline getPipeline() { - PipelineOptions opts = PipelineOptionsFactory.create(); - opts.setRunner(InProcessPipelineRunner.class); - - Pipeline p = Pipeline.create(opts); - return p; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java deleted file mode 100644 index 03443f8..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java +++ /dev/null @@ -1,496 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasEntry; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.doAnswer; - -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Mean; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.PCollectionViews; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -import com.google.common.collect.ImmutableList; - -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; - -/** - * Tests for {@link InProcessSideInputContainer}. - */ -@RunWith(JUnit4.class) -public class InProcessSideInputContainerTest { - private static final BoundedWindow FIRST_WINDOW = - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(789541L); - } - - @Override - public String toString() { - return "firstWindow"; - } - }; - - private static final BoundedWindow SECOND_WINDOW = - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(14564786L); - } - - @Override - public String toString() { - return "secondWindow"; - } - }; - - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Mock - private InProcessEvaluationContext context; - - private TestPipeline pipeline; - - private InProcessSideInputContainer container; - - private PCollectionView<Map<String, Integer>> mapView; - private PCollectionView<Double> singletonView; - - // Not present in container. - private PCollectionView<Iterable<Integer>> iterableView; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - pipeline = TestPipeline.create(); - - PCollection<Integer> create = - pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4)); - - mapView = - create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo")) - .apply("asMapView", View.<String, Integer>asMap()); - - singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView()); - iterableView = create.apply("asIterableView", View.<Integer>asIterable()); - - container = InProcessSideInputContainer.create( - context, ImmutableList.of(iterableView, mapView, singletonView)); - } - - @Test - public void getAfterWriteReturnsPaneInWindow() throws Exception { - WindowedValue<KV<String, Integer>> one = - WindowedValue.of( - KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<KV<String, Integer>> two = - WindowedValue.of( - KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); - - Map<String, Integer> viewContents = - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) - .get(mapView, FIRST_WINDOW); - assertThat(viewContents, hasEntry("one", 1)); - assertThat(viewContents, hasEntry("two", 2)); - assertThat(viewContents.size(), is(2)); - } - - @Test - public void getReturnsLatestPaneInWindow() throws Exception { - WindowedValue<KV<String, Integer>> one = - WindowedValue.of( - KV.of("one", 1), - new Instant(1L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue<KV<String, Integer>> two = - WindowedValue.of( - KV.of("two", 2), - new Instant(20L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); - - Map<String, Integer> viewContents = - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) - .get(mapView, SECOND_WINDOW); - assertThat(viewContents, hasEntry("one", 1)); - assertThat(viewContents, hasEntry("two", 2)); - assertThat(viewContents.size(), is(2)); - - WindowedValue<KV<String, Integer>> three = - WindowedValue.of( - KV.of("three", 3), - new Instant(300L), - SECOND_WINDOW, - PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(three)); - - Map<String, Integer> overwrittenViewContents = - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) - .get(mapView, SECOND_WINDOW); - assertThat(overwrittenViewContents, hasEntry("three", 3)); - assertThat(overwrittenViewContents.size(), is(1)); - } - - /** - * Demonstrates that calling get() on a window that currently has no data does not return until - * there is data in the pane. - */ - @Test - public void getBlocksUntilPaneAvailable() throws Exception { - BoundedWindow window = - new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; - Future<Double> singletonFuture = - getFutureOfView( - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)), - singletonView, - window); - - WindowedValue<Double> singletonValue = - WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); - - assertThat(singletonFuture.isDone(), is(false)); - container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue)); - assertThat(singletonFuture.get(), equalTo(4.75)); - } - - @Test - public void withPCollectionViewsWithPutInOriginalReturnsContents() throws Exception { - BoundedWindow window = new BoundedWindow() { - @Override - public Instant maxTimestamp() { - return new Instant(1024L); - } - }; - SideInputReader newReader = - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)); - Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window); - - WindowedValue<Double> singletonValue = - WindowedValue.of(24.125, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); - - assertThat(singletonFuture.isDone(), is(false)); - container.write(singletonView, ImmutableList.<WindowedValue<?>>of(singletonValue)); - assertThat(singletonFuture.get(), equalTo(24.125)); - } - - @Test - public void withPCollectionViewsErrorsForContainsNotInViews() { - PCollectionView<Map<String, Iterable<String>>> newView = - PCollectionViews.multimapView( - pipeline, - WindowingStrategy.globalDefault(), - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString()); - - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView)); - } - - @Test - public void withViewsForViewNotInContainerFails() { - PCollectionView<Map<String, Iterable<String>>> newView = - PCollectionViews.multimapView( - pipeline, - WindowingStrategy.globalDefault(), - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("unknown views"); - thrown.expectMessage(newView.toString()); - - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView)); - } - - @Test - public void getOnReaderForViewNotInReaderFails() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("unknown view: " + iterableView.toString()); - - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) - .get(iterableView, GlobalWindow.INSTANCE); - } - - @Test - public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception { - WindowedValue<Double> firstWindowedValue = - WindowedValue.of( - 2.875, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<Double> secondWindowedValue = - WindowedValue.of( - 4.125, - SECOND_WINDOW.maxTimestamp().minus(2_000_000L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); - assertThat( - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) - .get(singletonView, FIRST_WINDOW), - equalTo(2.875)); - assertThat( - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) - .get(singletonView, SECOND_WINDOW), - equalTo(4.125)); - } - - @Test - public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception { - WindowedValue<Integer> firstValue = - WindowedValue.of( - 44, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<Integer> secondValue = - WindowedValue.of( - 44, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - - container.write(iterableView, ImmutableList.of(firstValue, secondValue)); - - assertThat( - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView)) - .get(iterableView, FIRST_WINDOW), - contains(44, 44)); - } - - @Test - public void writeForElementInMultipleWindowsSucceeds() throws Exception { - WindowedValue<Double> multiWindowedValue = - WindowedValue.of( - 2.875, - FIRST_WINDOW.maxTimestamp().minus(200L), - ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(singletonView, ImmutableList.of(multiWindowedValue)); - assertThat( - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) - .get(singletonView, FIRST_WINDOW), - equalTo(2.875)); - assertThat( - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) - .get(singletonView, SECOND_WINDOW), - equalTo(2.875)); - } - - @Test - public void finishDoesNotOverwriteWrittenElements() throws Exception { - WindowedValue<KV<String, Integer>> one = - WindowedValue.of( - KV.of("one", 1), - new Instant(1L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue<KV<String, Integer>> two = - WindowedValue.of( - KV.of("two", 2), - new Instant(20L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); - - immediatelyInvokeCallback(mapView, SECOND_WINDOW); - - Map<String, Integer> viewContents = - container - .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)) - .get(mapView, SECOND_WINDOW); - - assertThat(viewContents, hasEntry("one", 1)); - assertThat(viewContents, hasEntry("two", 2)); - assertThat(viewContents.size(), is(2)); - } - - @Test - public void finishOnPendingViewsSetsEmptyElements() throws Exception { - immediatelyInvokeCallback(mapView, SECOND_WINDOW); - Future<Map<String, Integer>> mapFuture = - getFutureOfView( - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)), - mapView, - SECOND_WINDOW); - - assertThat(mapFuture.get().isEmpty(), is(true)); - } - - /** - * Demonstrates that calling isReady on an empty container throws an - * {@link IllegalArgumentException}. - */ - @Test - public void isReadyInEmptyReaderThrows() { - ReadyCheckingSideInputReader reader = - container.createReaderForViews(ImmutableList.<PCollectionView<?>>of()); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("does not contain"); - thrown.expectMessage(ImmutableList.of().toString()); - reader.isReady(mapView, GlobalWindow.INSTANCE); - } - - /** - * Demonstrates that calling isReady returns false until elements are written to the - * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true. - */ - @Test - public void isReadyForSomeNotReadyViewsFalseUntilElements() { - container.write( - mapView, - ImmutableList.of( - WindowedValue.of( - KV.of("one", 1), - SECOND_WINDOW.maxTimestamp().minus(100L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); - - ReadyCheckingSideInputReader reader = - container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); - assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); - - container.write( - mapView, - ImmutableList.of( - WindowedValue.of( - KV.of("too", 2), - FIRST_WINDOW.maxTimestamp().minus(100L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true)); - - container.write( - singletonView, - ImmutableList.of( - WindowedValue.of( - 1.25, - SECOND_WINDOW.maxTimestamp().minus(100L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); - assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true)); - - assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false)); - assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); - } - - @Test - public void isReadyForEmptyWindowTrue() { - immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE); - - ReadyCheckingSideInputReader reader = - container.createReaderForViews(ImmutableList.of(mapView, singletonView)); - assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true)); - assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false)); - - immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE); - assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true)); - } - - /** - * When a callAfterWindowCloses with the specified view's producing transform, window, and - * windowing strategy is invoked, immediately execute the callback. - */ - private void immediatelyInvokeCallback(PCollectionView<?> view, BoundedWindow window) { - doAnswer( - new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - Object callback = invocation.getArguments()[3]; - Runnable callbackRunnable = (Runnable) callback; - callbackRunnable.run(); - return null; - } - }) - .when(context) - .scheduleAfterOutputWouldBeProduced( - Mockito.eq(view), - Mockito.eq(window), - Mockito.eq(view.getWindowingStrategyInternal()), - Mockito.any(Runnable.class)); - } - - private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader, - final PCollectionView<ValueT> view, final BoundedWindow window) { - Callable<ValueT> callable = new Callable<ValueT>() { - @Override - public ValueT call() throws Exception { - return myReader.get(view, window); - } - }; - return Executors.newSingleThreadExecutor().submit(callable); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java deleted file mode 100644 index b496981..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessTimerInternalsTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TransformWatermarks; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespaces; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link InProcessTimerInternals}. - */ -@RunWith(JUnit4.class) -public class InProcessTimerInternalsTest { - private MockClock clock; - @Mock private TransformWatermarks watermarks; - - private TimerUpdateBuilder timerUpdateBuilder; - - private InProcessTimerInternals internals; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - clock = MockClock.fromInstant(new Instant(0)); - - timerUpdateBuilder = TimerUpdate.builder(1234); - - internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder); - } - - @Test - public void setTimerAddsToBuilder() { - TimerData eventTimer = - TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); - TimerData processingTimer = - TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); - TimerData synchronizedProcessingTimer = - TimerData.of( - StateNamespaces.global(), - new Instant(98745632189L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - internals.setTimer(eventTimer); - internals.setTimer(processingTimer); - internals.setTimer(synchronizedProcessingTimer); - - assertThat( - internals.getTimerUpdate().getSetTimers(), - containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer)); - } - - @Test - public void deleteTimerDeletesOnBuilder() { - TimerData eventTimer = - TimerData.of(StateNamespaces.global(), new Instant(20145L), TimeDomain.EVENT_TIME); - TimerData processingTimer = - TimerData.of(StateNamespaces.global(), new Instant(125555555L), TimeDomain.PROCESSING_TIME); - TimerData synchronizedProcessingTimer = - TimerData.of( - StateNamespaces.global(), - new Instant(98745632189L), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - internals.deleteTimer(eventTimer); - internals.deleteTimer(processingTimer); - internals.deleteTimer(synchronizedProcessingTimer); - - assertThat( - internals.getTimerUpdate().getDeletedTimers(), - containsInAnyOrder(eventTimer, synchronizedProcessingTimer, processingTimer)); - } - - @Test - public void getProcessingTimeIsClockNow() { - assertThat(internals.currentProcessingTime(), equalTo(clock.now())); - Instant oldProcessingTime = internals.currentProcessingTime(); - - clock.advance(Duration.standardHours(12)); - - assertThat(internals.currentProcessingTime(), equalTo(clock.now())); - assertThat( - internals.currentProcessingTime(), - equalTo(oldProcessingTime.plus(Duration.standardHours(12)))); - } - - @Test - public void getSynchronizedProcessingTimeIsWatermarkSynchronizedInputTime() { - when(watermarks.getSynchronizedProcessingInputTime()).thenReturn(new Instant(12345L)); - assertThat(internals.currentSynchronizedProcessingTime(), equalTo(new Instant(12345L))); - } - - @Test - public void getInputWatermarkTimeUsesWatermarkTime() { - when(watermarks.getInputWatermark()).thenReturn(new Instant(8765L)); - assertThat(internals.currentInputWatermarkTime(), equalTo(new Instant(8765L))); - } - - @Test - public void getOutputWatermarkTimeUsesWatermarkTime() { - when(watermarks.getOutputWatermark()).thenReturn(new Instant(25525L)); - assertThat(internals.currentOutputWatermarkTime(), equalTo(new Instant(25525L))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java deleted file mode 100644 index b89340e..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/KeyedPValueTrackingVisitorTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.collect.ImmutableSet; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Collections; -import java.util.Set; - -/** - * Tests for {@link KeyedPValueTrackingVisitor}. - */ -@RunWith(JUnit4.class) -public class KeyedPValueTrackingVisitorTest { - @Rule public ExpectedException thrown = ExpectedException.none(); - - private KeyedPValueTrackingVisitor visitor; - private Pipeline p; - - @Before - public void setup() { - PipelineOptions options = PipelineOptionsFactory.create(); - - p = Pipeline.create(options); - @SuppressWarnings("rawtypes") - Set<Class<? extends PTransform>> producesKeyed = - ImmutableSet.<Class<? extends PTransform>>of(PrimitiveKeyer.class, CompositeKeyer.class); - visitor = KeyedPValueTrackingVisitor.create(producesKeyed); - } - - @Test - public void primitiveProducesKeyedOutputUnkeyedInputKeyedOutput() { - PCollection<Integer> keyed = - p.apply(Create.<Integer>of(1, 2, 3)).apply(new PrimitiveKeyer<Integer>()); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), hasItem(keyed)); - } - - @Test - public void primitiveProducesKeyedOutputKeyedInputKeyedOutut() { - PCollection<Integer> keyed = - p.apply(Create.<Integer>of(1, 2, 3)) - .apply("firstKey", new PrimitiveKeyer<Integer>()) - .apply("secondKey", new PrimitiveKeyer<Integer>()); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), hasItem(keyed)); - } - - @Test - public void compositeProducesKeyedOutputUnkeyedInputKeyedOutput() { - PCollection<Integer> keyed = - p.apply(Create.<Integer>of(1, 2, 3)).apply(new CompositeKeyer<Integer>()); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), hasItem(keyed)); - } - - @Test - public void compositeProducesKeyedOutputKeyedInputKeyedOutut() { - PCollection<Integer> keyed = - p.apply(Create.<Integer>of(1, 2, 3)) - .apply("firstKey", new CompositeKeyer<Integer>()) - .apply("secondKey", new CompositeKeyer<Integer>()); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), hasItem(keyed)); - } - - - @Test - public void noInputUnkeyedOutput() { - PCollection<KV<Integer, Iterable<Void>>> unkeyed = - p.apply( - Create.of(KV.<Integer, Iterable<Void>>of(-1, Collections.<Void>emptyList())) - .withCoder(KvCoder.of(VarIntCoder.of(), IterableCoder.of(VoidCoder.of())))); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed))); - } - - @Test - public void keyedInputNotProducesKeyedOutputUnkeyedOutput() { - PCollection<Integer> onceKeyed = - p.apply(Create.<Integer>of(1, 2, 3)) - .apply(new PrimitiveKeyer<Integer>()) - .apply(ParDo.of(new IdentityFn<Integer>())); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), not(hasItem(onceKeyed))); - } - - @Test - public void unkeyedInputNotProducesKeyedOutputUnkeyedOutput() { - PCollection<Integer> unkeyed = - p.apply(Create.<Integer>of(1, 2, 3)).apply(ParDo.of(new IdentityFn<Integer>())); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed))); - } - - @Test - public void traverseMultipleTimesThrows() { - p.apply( - Create.<KV<Integer, Void>>of( - KV.of(1, (Void) null), KV.of(2, (Void) null), KV.of(3, (Void) null)) - .withCoder(KvCoder.of(VarIntCoder.of(), VoidCoder.of()))) - .apply(GroupByKey.<Integer, Void>create()) - .apply(Keys.<Integer>create()); - - p.traverseTopologically(visitor); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage("already been finalized"); - thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName()); - p.traverseTopologically(visitor); - } - - @Test - public void getKeyedPValuesBeforeTraverseThrows() { - thrown.expect(IllegalStateException.class); - thrown.expectMessage("completely traversed"); - thrown.expectMessage("getKeyedPValues"); - visitor.getKeyedPValues(); - } - - private static class PrimitiveKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> { - @Override - public PCollection<K> apply(PCollection<K> input) { - return PCollection.<K>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(input.getCoder()); - } - } - - private static class CompositeKeyer<K> extends PTransform<PCollection<K>, PCollection<K>> { - @Override - public PCollection<K> apply(PCollection<K> input) { - return input.apply(new PrimitiveKeyer<K>()).apply(ParDo.of(new IdentityFn<K>())); - } - } - - private static class IdentityFn<K> extends DoFn<K, K> { - @Override - public void processElement(DoFn<K, K>.ProcessContext c) throws Exception { - c.output(c.element()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java deleted file mode 100644 index 152cac4..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/MockClock.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static com.google.common.base.Preconditions.checkArgument; - -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * A clock that returns a constant value for now which can be set with calls to - * {@link #set(Instant)}. - * - * <p>For uses of the {@link Clock} interface in unit tests. - */ -public class MockClock implements Clock { - - private Instant now; - - public static MockClock fromInstant(Instant initial) { - return new MockClock(initial); - } - - private MockClock(Instant initialNow) { - this.now = initialNow; - } - - public void set(Instant newNow) { - checkArgument(!newNow.isBefore(now), "Cannot move MockClock backwards in time from %s to %s", - now, newNow); - this.now = newNow; - } - - public void advance(Duration duration) { - checkArgument( - duration.getMillis() > 0, - "Cannot move MockClock backwards in time by duration %s", - duration); - set(now.plus(duration)); - } - - @Override - public Instant now() { - return now; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java deleted file mode 100644 index a048e3a..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ /dev/null @@ -1,431 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.runners.inprocess; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.common.CounterSet; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; - -import org.hamcrest.Matchers; -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.io.Serializable; - -/** - * Tests for {@link ParDoMultiEvaluatorFactory}. - */ -@RunWith(JUnit4.class) -public class ParDoMultiEvaluatorFactoryTest implements Serializable { - private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); - - @Test - public void testParDoMultiInMemoryTransformEvaluator() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - final TupleTag<Integer> lengthTag = new TupleTag<>(); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - PCollection<Integer> lengthOutput = outputTuple.get(lengthTag); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createRootBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); - UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput); - - when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(inputBundle, elementOutput)) - .thenReturn(elementOutputBundle); - when(evaluationContext.createBundle(inputBundle, lengthOutput)).thenReturn(lengthOutputBundle); - - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder( - lengthOutputBundle, mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getCounters(), equalTo(counters)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<String>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - lengthOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<Integer>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(3), - WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)), - WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void testParDoMultiUndeclaredSideOutput() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - final TupleTag<Integer> lengthTag = new TupleTag<>(); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createRootBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); - - when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(inputBundle, elementOutput)) - .thenReturn(elementOutputBundle); - - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, null, null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); - assertThat(result.getCounters(), equalTo(counters)); - - assertThat( - mainOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow(KV.of("foo", 3)), - WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)), - WindowedValue.valueInGlobalWindow( - KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING))); - assertThat( - elementOutputBundle.commit(Instant.now()).getElements(), - Matchers.<WindowedValue<String>>containsInAnyOrder( - WindowedValue.valueInGlobalWindow("foo"), - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)), - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING))); - } - - @Test - public void finishBundleWithStatePutsStateInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - - final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag = - StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEndOfWindow()); - final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of()); - final StateNamespace windowNs = - StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE); - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals() - .stateInternals() - .state(StateNamespaces.global(), watermarkTag) - .add(new Instant(20202L + c.element().length())); - c.windowingInternals() - .stateInternals() - .state( - StateNamespaces.window( - GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE), - bagTag) - .add(c.element()); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createRootBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); - - when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(inputBundle, elementOutput)) - .thenReturn(elementOutputBundle); - - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); - assertThat(result.getWatermarkHold(), equalTo(new Instant(20205L))); - assertThat(result.getState(), not(nullValue())); - assertThat( - result.getState().state(StateNamespaces.global(), watermarkTag).read(), - equalTo(new Instant(20205L))); - assertThat( - result.getState().state(windowNs, bagTag).read(), - containsInAnyOrder("foo", "bara", "bazam")); - } - - @Test - public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception { - TestPipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - - TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String, Integer>>() {}; - final TupleTag<String> elementTag = new TupleTag<>(); - - final TimerData addedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME); - final TimerData deletedTimer = - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - - BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); - } - }) - .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); - PCollectionTuple outputTuple = input.apply(pardo); - - CommittedBundle<String> inputBundle = - bundleFactory.createRootBundle(input).commit(Instant.now()); - - PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); - PCollection<String> elementOutput = outputTuple.get(elementTag); - - InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = - bundleFactory.createRootBundle(mainOutput); - UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); - - when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); - when(evaluationContext.createBundle(inputBundle, elementOutput)) - .thenReturn(elementOutputBundle); - - InProcessExecutionContext executionContext = - new InProcessExecutionContext(null, "myKey", null, null); - when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null)) - .thenReturn(executionContext); - CounterSet counters = new CounterSet(); - when(evaluationContext.createCounterSet()).thenReturn(counters); - - org.apache.beam.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory() - .forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); - - evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); - evaluator.processElement( - WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000))); - evaluator.processElement( - WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)); - - InProcessTransformResult result = evaluator.finishBundle(); - assertThat( - result.getTimerUpdate(), - equalTo( - TimerUpdate.builder("myKey") - .setTimer(addedTimer) - .setTimer(addedTimer) - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .deletedTimer(deletedTimer) - .build())); - } -}
