Add BundleFactory, ImmutabilityCheckingBundleFactory This allows checks to be made on the contents of bundles. ImmutabilityCheckingBundleFactory produces bundles that ensure that elements output to a bundle are not modified after being output.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/334ab99a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/334ab99a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/334ab99a Branch: refs/heads/master Commit: 334ab99ab39b7f0632848b789e2c0af1782b11c0 Parents: ac314ee Author: Thomas Groh <[email protected]> Authored: Thu Mar 17 17:39:45 2016 -0700 Committer: bchambers <[email protected]> Committed: Mon Apr 4 15:44:26 2016 -0700 ---------------------------------------------------------------------- .../sdk/runners/inprocess/BundleFactory.java | 50 +++++ .../ExecutorServiceParallelExecutor.java | 5 +- .../ImmutabilityCheckingBundleFactory.java | 131 +++++++++++ .../inprocess/InProcessBundleFactory.java | 157 +++++++++++++ .../inprocess/InProcessEvaluationContext.java | 18 +- .../inprocess/InProcessPipelineRunner.java | 5 + .../BoundedReadEvaluatorFactoryTest.java | 21 +- .../inprocess/FlattenEvaluatorFactoryTest.java | 11 +- .../GroupByKeyEvaluatorFactoryTest.java | 10 +- .../ImmutabilityCheckingBundleFactoryTest.java | 220 +++++++++++++++++++ .../inprocess/InMemoryWatermarkManagerTest.java | 49 +++-- .../inprocess/InProcessBundleFactoryTest.java | 197 +++++++++++++++++ .../InProcessEvaluationContextTest.java | 11 +- .../ParDoMultiEvaluatorFactoryTest.java | 95 ++++---- .../ParDoSingleEvaluatorFactoryTest.java | 129 ++++++----- .../inprocess/TransformExecutorTest.java | 10 +- .../UnboundedReadEvaluatorFactoryTest.java | 10 +- .../inprocess/ViewEvaluatorFactoryTest.java | 5 +- 18 files changed, 980 insertions(+), 154 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java new file mode 100644 index 0000000..cb8a369 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BundleFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import com.google.cloud.dataflow.sdk.runners.inprocess.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.values.PCollection; + +/** + * A factory that creates {@link UncommittedBundle UncommittedBundles}. + */ +public interface BundleFactory { + /** + * Create an {@link UncommittedBundle} from an empty input. Elements added to the bundle belong to + * the {@code output} {@link PCollection}. + */ + public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output); + + /** + * Create an {@link UncommittedBundle} from the specified input. Elements added to the bundle + * belong to the {@code output} {@link PCollection}. + */ + public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output); + + /** + * Create an {@link UncommittedBundle} with the specified keys at the specified step. For use by + * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle + * belong to the {@code output} {@link PCollection}. + */ + public <T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, Object key, PCollection<T> output); +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java index 628f107..9af6f97 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java @@ -374,8 +374,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery); @SuppressWarnings({"unchecked", "rawtypes"}) CommittedBundle<?> bundle = - InProcessBundle.<KeyedWorkItem<Object, Object>>keyed( - (PCollection) transform.getInput(), keyTimers.getKey()) + evaluationContext + .createKeyedBundle( + null, keyTimers.getKey(), (PCollection) transform.getInput()) .add(WindowedValue.valueInEmptyWindows(work)) .commit(Instant.now()); scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java new file mode 100644 index 0000000..44670e8 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactory.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.api.client.util.Throwables; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; +import com.google.cloud.dataflow.sdk.util.MutationDetector; +import com.google.cloud.dataflow.sdk.util.MutationDetectors; +import com.google.cloud.dataflow.sdk.util.SerializableUtils; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; + +import org.joda.time.Instant; + +/** + * A {@link BundleFactory} that ensures that elements added to it are not mutated after being + * output. Immutability checks are enforced at the time {@link UncommittedBundle#commit(Instant)} is + * called, checking the value at that time against the value at the time the element was added. All + * elements added to the bundle will be encoded by the {@link Coder} of the underlying + * {@link PCollection}. + * + * <p>This catches errors during the execution of a {@link DoFn} caused by modifying an element + * after it is added to an output {@link PCollection}. + */ +class ImmutabilityCheckingBundleFactory implements BundleFactory { + /** + * Create a new {@link ImmutabilityCheckingBundleFactory} that uses the underlying + * {@link BundleFactory} to create the output bundle. + */ + public static ImmutabilityCheckingBundleFactory create(BundleFactory underlying) { + return new ImmutabilityCheckingBundleFactory(underlying); + } + + private final BundleFactory underlying; + + private ImmutabilityCheckingBundleFactory(BundleFactory underlying) { + this.underlying = checkNotNull(underlying); + } + + @Override + public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { + return new ImmutabilityEnforcingBundle<>(underlying.createRootBundle(output)); + } + + @Override + public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { + return new ImmutabilityEnforcingBundle<>(underlying.createBundle(input, output)); + } + + @Override + public <T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, Object key, PCollection<T> output) { + return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output)); + } + + private static class ImmutabilityEnforcingBundle<T> implements UncommittedBundle<T> { + private final UncommittedBundle<T> underlying; + private final SetMultimap<WindowedValue<T>, MutationDetector> mutationDetectors; + private Coder<T> coder; + + public ImmutabilityEnforcingBundle(UncommittedBundle<T> underlying) { + this.underlying = underlying; + mutationDetectors = HashMultimap.create(); + coder = SerializableUtils.clone(getPCollection().getCoder()); + } + + @Override + public PCollection<T> getPCollection() { + return underlying.getPCollection(); + } + + @Override + public UncommittedBundle<T> add(WindowedValue<T> element) { + try { + mutationDetectors.put( + element, MutationDetectors.forValueWithCoder(element.getValue(), coder)); + } catch (CoderException e) { + throw Throwables.propagate(e); + } + underlying.add(element); + return this; + } + + @Override + public CommittedBundle<T> commit(Instant synchronizedProcessingTime) { + for (MutationDetector detector : mutationDetectors.values()) { + try { + detector.verifyUnmodified(); + } catch (IllegalMutationException exn) { + throw UserCodeException.wrap( + new IllegalMutationException( + String.format( + "PTransform %s mutated value %s after it was output (new value was %s)." + + " Values must not be mutated in any way after being output.", + underlying.getPCollection().getProducingTransformInternal().getFullName(), + exn.getSavedValue(), + exn.getNewValue()), + exn.getSavedValue(), + exn.getNewValue(), + exn)); + } + } + return underlying.commit(synchronizedProcessingTime); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java new file mode 100644 index 0000000..7ca1b60 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactory.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; + +import org.joda.time.Instant; + +import javax.annotation.Nullable; + +/** + * A factory that produces bundles that perform no additional validation. + */ +class InProcessBundleFactory implements BundleFactory { + public static InProcessBundleFactory create() { + return new InProcessBundleFactory(); + } + + private InProcessBundleFactory() {} + + @Override + public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { + return InProcessBundle.unkeyed(output); + } + + @Override + public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { + return input.isKeyed() + ? InProcessBundle.keyed(output, input.getKey()) + : InProcessBundle.unkeyed(output); + } + + @Override + public <T> UncommittedBundle<T> createKeyedBundle( + CommittedBundle<?> input, Object key, PCollection<T> output) { + return InProcessBundle.keyed(output, key); + } + + /** + * A {@link UncommittedBundle} that buffers elements in memory. + */ + private static final class InProcessBundle<T> implements UncommittedBundle<T> { + private final PCollection<T> pcollection; + private final boolean keyed; + private final Object key; + private boolean committed = false; + private ImmutableList.Builder<WindowedValue<T>> elements; + + /** + * Create a new {@link InProcessBundle} for the specified {@link PCollection} without a key. + */ + public static <T> InProcessBundle<T> unkeyed(PCollection<T> pcollection) { + return new InProcessBundle<T>(pcollection, false, null); + } + + /** + * Create a new {@link InProcessBundle} for the specified {@link PCollection} with the specified + * key. + * + * <p>See {@link CommittedBundle#getKey()} and {@link CommittedBundle#isKeyed()} for more + * information. + */ + public static <T> InProcessBundle<T> keyed(PCollection<T> pcollection, Object key) { + return new InProcessBundle<T>(pcollection, true, key); + } + + private InProcessBundle(PCollection<T> pcollection, boolean keyed, Object key) { + this.pcollection = pcollection; + this.keyed = keyed; + this.key = key; + this.elements = ImmutableList.builder(); + } + + @Override + public PCollection<T> getPCollection() { + return pcollection; + } + + @Override + public InProcessBundle<T> add(WindowedValue<T> element) { + checkState( + !committed, + "Can't add element %s to committed bundle in PCollection %s", + element, + pcollection); + elements.add(element); + return this; + } + + @Override + public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) { + checkState(!committed, "Can't commit already committed bundle %s", this); + committed = true; + final Iterable<WindowedValue<T>> committedElements = elements.build(); + return new CommittedBundle<T>() { + @Override + @Nullable + public Object getKey() { + return key; + } + + @Override + public boolean isKeyed() { + return keyed; + } + + @Override + public Iterable<WindowedValue<T>> getElements() { + return committedElements; + } + + @Override + public PCollection<T> getPCollection() { + return pcollection; + } + + @Override + public Instant getSynchronizedProcessingOutputWatermark() { + return synchronizedCompletionTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .omitNullValues() + .add("pcollection", pcollection) + .add("key", key) + .add("elements", committedElements) + .toString(); + } + }; + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java index dcbbf40..078827d 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java @@ -77,6 +77,7 @@ class InProcessEvaluationContext { /** The options that were used to create this {@link Pipeline}. */ private final InProcessPipelineOptions options; + private final BundleFactory bundleFactory; /** The current processing time and event time watermarks and timers. */ private final InMemoryWatermarkManager watermarkManager; @@ -93,21 +94,24 @@ class InProcessEvaluationContext { public static InProcessEvaluationContext create( InProcessPipelineOptions options, + BundleFactory bundleFactory, Collection<AppliedPTransform<?, ?, ?>> rootTransforms, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, Map<AppliedPTransform<?, ?, ?>, String> stepNames, Collection<PCollectionView<?>> views) { return new InProcessEvaluationContext( - options, rootTransforms, valueToConsumers, stepNames, views); + options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views); } private InProcessEvaluationContext( InProcessPipelineOptions options, + BundleFactory bundleFactory, Collection<AppliedPTransform<?, ?, ?>> rootTransforms, Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers, Map<AppliedPTransform<?, ?, ?>, String> stepNames, Collection<PCollectionView<?>> views) { this.options = checkNotNull(options); + this.bundleFactory = checkNotNull(bundleFactory); checkNotNull(rootTransforms); checkNotNull(valueToConsumers); checkNotNull(stepNames); @@ -207,7 +211,7 @@ class InProcessEvaluationContext { * Create a {@link UncommittedBundle} for use by a source. */ public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) { - return InProcessBundle.unkeyed(output); + return bundleFactory.createRootBundle(output); } /** @@ -215,9 +219,7 @@ class InProcessEvaluationContext { * PCollection}. */ public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) { - return input.isKeyed() - ? InProcessBundle.keyed(output, input.getKey()) - : InProcessBundle.unkeyed(output); + return bundleFactory.createBundle(input, output); } /** @@ -226,7 +228,7 @@ class InProcessEvaluationContext { */ public <T> UncommittedBundle<T> createKeyedBundle( CommittedBundle<?> input, Object key, PCollection<T> output) { - return InProcessBundle.keyed(output, key); + return bundleFactory.createKeyedBundle(input, key, output); } /** @@ -355,7 +357,9 @@ class InProcessEvaluationContext { * for each time they are set. */ public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() { - return watermarkManager.extractFiredTimers(); + Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = + watermarkManager.extractFiredTimers(); + return fired; } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java index 8123711..4fb01b7 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessPipelineRunner.java @@ -232,6 +232,7 @@ public class InProcessPipelineRunner InProcessEvaluationContext context = InProcessEvaluationContext.create( getPipelineOptions(), + createBundleFactory(getPipelineOptions()), consumerTrackingVisitor.getRootTransforms(), consumerTrackingVisitor.getValueToConsumers(), consumerTrackingVisitor.getStepNames(), @@ -271,6 +272,10 @@ public class InProcessPipelineRunner return Collections.emptyMap(); } + private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) { + return InProcessBundleFactory.create(); + } + /** * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}. * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java index ebece5f..8e92caf 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder; @@ -48,6 +47,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import java.io.IOException; import java.util.Arrays; @@ -63,20 +63,22 @@ public class BoundedReadEvaluatorFactoryTest { private PCollection<Long> longs; private TransformEvaluatorFactory factory; @Mock private InProcessEvaluationContext context; + private BundleFactory bundleFactory; @Before public void setup() { + MockitoAnnotations.initMocks(this); source = CountingSource.upTo(10L); TestPipeline p = TestPipeline.create(); longs = p.apply(Read.from(source)); factory = new BoundedReadEvaluatorFactory(); - context = mock(InProcessEvaluationContext.class); + bundleFactory = InProcessBundleFactory.create(); } @Test public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception { - UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output); TransformEvaluator<?> evaluator = @@ -96,8 +98,7 @@ public class BoundedReadEvaluatorFactoryTest { */ @Test public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception { - UncommittedBundle<Long> output = - InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output); TransformEvaluator<?> evaluator = @@ -111,7 +112,7 @@ public class BoundedReadEvaluatorFactoryTest { containsInAnyOrder( gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L))); - UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(secondOutput); TransformEvaluator<?> secondEvaluator = factory.forApplication(longs.getProducingTransformInternal(), null, context); @@ -132,8 +133,8 @@ public class BoundedReadEvaluatorFactoryTest { */ @Test public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception { - UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); - UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> output = bundleFactory.createRootBundle(longs); + UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs); when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput); // create both evaluators before finishing either. @@ -171,7 +172,7 @@ public class BoundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); @@ -189,7 +190,7 @@ public class BoundedReadEvaluatorFactoryTest { PCollection<Long> pcollection = p.apply(Read.from(source)); AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); - UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs); + UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection); when(context.createRootBundle(pcollection)).thenReturn(output); TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java index 39cc54a..f93abd8 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/FlattenEvaluatorFactoryTest.java @@ -45,6 +45,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class FlattenEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); @Test public void testFlattenInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -54,13 +55,15 @@ public class FlattenEvaluatorFactoryTest { PCollection<Integer> flattened = list.apply(Flatten.<Integer>pCollections()); - CommittedBundle<Integer> leftBundle = InProcessBundle.unkeyed(left).commit(Instant.now()); - CommittedBundle<Integer> rightBundle = InProcessBundle.unkeyed(right).commit(Instant.now()); + CommittedBundle<Integer> leftBundle = + bundleFactory.createRootBundle(left).commit(Instant.now()); + CommittedBundle<Integer> rightBundle = + bundleFactory.createRootBundle(right).commit(Instant.now()); InProcessEvaluationContext context = mock(InProcessEvaluationContext.class); - UncommittedBundle<Integer> flattenedLeftBundle = InProcessBundle.unkeyed(flattened); - UncommittedBundle<Integer> flattenedRightBundle = InProcessBundle.unkeyed(flattened); + UncommittedBundle<Integer> flattenedLeftBundle = bundleFactory.createRootBundle(flattened); + UncommittedBundle<Integer> flattenedRightBundle = bundleFactory.createRootBundle(flattened); when(context.createBundle(leftBundle, flattened)).thenReturn(flattenedLeftBundle); when(context.createBundle(rightBundle, flattened)).thenReturn(flattenedRightBundle); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java index 9c0957d..e80125f 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java @@ -50,6 +50,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class GroupByKeyEvaluatorFactoryTest { + private BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -67,15 +69,15 @@ public class GroupByKeyEvaluatorFactoryTest { kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>()); CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle = - InProcessBundle.unkeyed(kvs).commit(Instant.now()); + bundleFactory.createRootBundle(kvs).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = - InProcessBundle.keyed(groupedKvs, "foo"); + bundleFactory.createKeyedBundle(null, "foo", groupedKvs); UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = - InProcessBundle.keyed(groupedKvs, "bar"); + bundleFactory.createKeyedBundle(null, "bar", groupedKvs); UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = - InProcessBundle.keyed(groupedKvs, "baz"); + bundleFactory.createKeyedBundle(null, "baz", groupedKvs); when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle); when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java new file mode 100644 index 0000000..40b1d5a --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ImmutabilityCheckingBundleFactoryTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.isA; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.IllegalMutationException; +import com.google.cloud.dataflow.sdk.util.UserCodeException; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link ImmutabilityCheckingBundleFactory}. + */ +@RunWith(JUnit4.class) +public class ImmutabilityCheckingBundleFactoryTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + private ImmutabilityCheckingBundleFactory factory; + private PCollection<byte[]> created; + private PCollection<byte[]> transformed; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + created = p.apply(Create.<byte[]>of().withCoder(ByteArrayCoder.of())); + transformed = created.apply(ParDo.of(new IdentityDoFn<byte[]>())); + factory = ImmutabilityCheckingBundleFactory.create(InProcessBundleFactory.create()); + } + + @Test + public void noMutationRootBundleSucceeds() { + UncommittedBundle<byte[]> root = factory.createRootBundle(created); + byte[] array = new byte[] {0, 1, 2}; + root.add(WindowedValue.valueInGlobalWindow(array)); + CommittedBundle<byte[]> committed = root.commit(Instant.now()); + + assertThat( + committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array))); + } + + @Test + public void noMutationKeyedBundleSucceeds() { + CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); + + WindowedValue<byte[]> windowedArray = + WindowedValue.of( + new byte[] {4, 8, 12}, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + keyed.add(windowedArray); + + CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void noMutationCreateBundleSucceeds() { + CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed); + + WindowedValue<byte[]> windowedArray = + WindowedValue.of( + new byte[] {4, 8, 12}, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + intermediate.add(windowedArray); + + CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void mutationBeforeAddRootBundleSucceeds() { + UncommittedBundle<byte[]> root = factory.createRootBundle(created); + byte[] array = new byte[] {0, 1, 2}; + array[1] = 2; + root.add(WindowedValue.valueInGlobalWindow(array)); + CommittedBundle<byte[]> committed = root.commit(Instant.now()); + + assertThat( + committed.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(array))); + } + + @Test + public void mutationBeforeAddKeyedBundleSucceeds() { + CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); + + byte[] array = new byte[] {4, 8, 12}; + array[0] = Byte.MAX_VALUE; + WindowedValue<byte[]> windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + keyed.add(windowedArray); + + CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void mutationBeforeAddCreateBundleSucceeds() { + CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed); + + byte[] array = new byte[] {4, 8, 12}; + WindowedValue<byte[]> windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + array[2] = -3; + intermediate.add(windowedArray); + + CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); + assertThat(committed.getElements(), containsInAnyOrder(windowedArray)); + } + + @Test + public void mutationAfterAddRootBundleThrows() { + UncommittedBundle<byte[]> root = factory.createRootBundle(created); + byte[] array = new byte[] {0, 1, 2}; + root.add(WindowedValue.valueInGlobalWindow(array)); + + array[1] = 2; + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage("Values must not be mutated in any way after being output"); + CommittedBundle<byte[]> committed = root.commit(Instant.now()); + } + + @Test + public void mutationAfterAddKeyedBundleThrows() { + CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed); + + byte[] array = new byte[] {4, 8, 12}; + WindowedValue<byte[]> windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + keyed.add(windowedArray); + + array[0] = Byte.MAX_VALUE; + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage("Values must not be mutated in any way after being output"); + CommittedBundle<byte[]> committed = keyed.commit(Instant.now()); + } + + @Test + public void mutationAfterAddCreateBundleThrows() { + CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now()); + UncommittedBundle<byte[]> intermediate = factory.createBundle(root, transformed); + + byte[] array = new byte[] {4, 8, 12}; + WindowedValue<byte[]> windowedArray = + WindowedValue.of( + array, + new Instant(891L), + new IntervalWindow(new Instant(0), new Instant(1000)), + PaneInfo.ON_TIME_AND_ONLY_FIRING); + intermediate.add(windowedArray); + + array[2] = -3; + thrown.expect(UserCodeException.class); + thrown.expectCause(isA(IllegalMutationException.class)); + thrown.expectMessage("Values must not be mutated in any way after being output"); + CommittedBundle<byte[]> committed = intermediate.commit(Instant.now()); + } + + private static class IdentityDoFn<T> extends DoFn<T, T> { + @Override + public void processElement(DoFn<T, T>.ProcessContext c) throws Exception { + c.output(c.element()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java index 140ac05..93d2a42 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManagerTest.java @@ -84,6 +84,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { private transient PCollection<Integer> flattened; private transient InMemoryWatermarkManager manager; + private transient BundleFactory bundleFactory; @Before public void setup() { @@ -131,6 +132,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { clock = MockClock.fromInstant(new Instant(1000)); manager = InMemoryWatermarkManager.create(clock, rootTransforms, consumers); + bundleFactory = InProcessBundleFactory.create(); } /** @@ -246,7 +248,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(withBufferedElements.getOutputWatermark(), equalTo(firstCollectionTimestamp)); CommittedBundle<?> completedFlattenBundle = - InProcessBundle.unkeyed(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createRootBundle(flattened).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(firstPcollectionBundle, flattened.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(completedFlattenBundle), null); @@ -342,13 +344,13 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void updateWatermarkWithKeyedWatermarkHolds() { CommittedBundle<Integer> firstKeyBundle = - InProcessBundle.keyed(createdInts, "Odd") + bundleFactory.createKeyedBundle(null, "Odd", createdInts) .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L))) .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L))) .commit(clock.now()); CommittedBundle<Integer> secondKeyBundle = - InProcessBundle.keyed(createdInts, "Even") + bundleFactory.createKeyedBundle(null, "Even", createdInts) .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L))) .commit(clock.now()); @@ -368,7 +370,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L)))); CommittedBundle<Integer> fauxFirstKeyTimerBundle = - InProcessBundle.keyed(createdInts, "Odd").commit(clock.now()); + bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now()); manager.updateWatermarks(fauxFirstKeyTimerBundle, filtered.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -376,7 +378,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L))); CommittedBundle<Integer> fauxSecondKeyTimerBundle = - InProcessBundle.keyed(createdInts, "Even").commit(clock.now()); + bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now()); manager.updateWatermarks(fauxSecondKeyTimerBundle, filtered.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>emptyList(), new Instant(5678L)); assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(5678L))); @@ -396,7 +398,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void updateOutputWatermarkShouldBeMonotonic() { CommittedBundle<?> firstInput = - InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(firstInput), new Instant(0L)); TransformWatermarks firstWatermarks = @@ -404,7 +406,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(firstWatermarks.getOutputWatermark(), equalTo(new Instant(0L))); CommittedBundle<?> secondInput = - InProcessBundle.unkeyed(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory.createRootBundle(createdInts).commit(BoundedWindow.TIMESTAMP_MAX_VALUE); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(secondInput), new Instant(-250L)); TransformWatermarks secondWatermarks = @@ -579,7 +581,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); CommittedBundle<Integer> createOutput = - InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L)); + bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -606,7 +608,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { not(laterThan(new Instant(1250L)))); CommittedBundle<?> filterOutputBundle = - InProcessBundle.unkeyed(intsToFlatten).commit(new Instant(1250L)); + bundleFactory.createRootBundle(intsToFlatten).commit(new Instant(1250L)); manager.updateWatermarks(createOutput, filtered.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filterOutputBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -673,9 +675,11 @@ public class InMemoryWatermarkManagerTest implements Serializable { assertThat(filteredDoubledWms.getSynchronizedProcessingOutputTime(), not(laterThan(startTime))); CommittedBundle<Integer> filteredTimerBundle = - InProcessBundle.keyed(filtered, "key").commit(BoundedWindow.TIMESTAMP_MAX_VALUE); + bundleFactory + .createKeyedBundle(null, "key", filtered) + .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); CommittedBundle<Integer> filteredTimerResult = - InProcessBundle.keyed(filteredTimesTwo, "key") + bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo) .commit(filteredWms.getSynchronizedProcessingOutputTime()); // Complete the processing time timer manager.updateWatermarks(filteredTimerBundle, filtered.getProducingTransformInternal(), @@ -725,7 +729,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { not(laterThan(BoundedWindow.TIMESTAMP_MIN_VALUE))); CommittedBundle<Integer> createOutput = - InProcessBundle.unkeyed(createdInts).commit(new Instant(1250L)); + bundleFactory.createRootBundle(createdInts).commit(new Instant(1250L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(createOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -736,7 +740,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { createAfterUpdate.getSynchronizedProcessingOutputTime(), not(laterThan(clock.now()))); CommittedBundle<Integer> createSecondOutput = - InProcessBundle.unkeyed(createdInts).commit(new Instant(750L)); + bundleFactory.createRootBundle(createdInts).commit(new Instant(750L)); manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(createSecondOutput), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -784,13 +788,20 @@ public class InMemoryWatermarkManagerTest implements Serializable { @Test public void synchronizedProcessingInputTimeIsHeldToPendingBundleTimes() { CommittedBundle<Integer> created = globallyWindowedBundle(createdInts, 1, 2, 3); - manager.updateWatermarks(null, createdInts.getProducingTransformInternal(), TimerUpdate.empty(), - Collections.<CommittedBundle<?>>singleton(created), new Instant(29_919_235L)); + manager.updateWatermarks( + null, + createdInts.getProducingTransformInternal(), + TimerUpdate.empty(), + Collections.<CommittedBundle<?>>singleton(created), + new Instant(29_919_235L)); Instant upstreamHold = new Instant(2048L); CommittedBundle<Integer> filteredBundle = - InProcessBundle.keyed(filtered, "key").commit(upstreamHold); - manager.updateWatermarks(created, filtered.getProducingTransformInternal(), TimerUpdate.empty(), + bundleFactory.createKeyedBundle(null, "key", filtered).commit(upstreamHold); + manager.updateWatermarks( + created, + filtered.getProducingTransformInternal(), + TimerUpdate.empty(), Collections.<CommittedBundle<?>>singleton(filteredBundle), BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -1094,7 +1105,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { @SafeVarargs private final <T> CommittedBundle<T> timestampedBundle( PCollection<T> pc, TimestampedValue<T>... values) { - UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc); + UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc); for (TimestampedValue<T> value : values) { bundle.add( WindowedValue.timestampedValueInGlobalWindow(value.getValue(), value.getTimestamp())); @@ -1104,7 +1115,7 @@ public class InMemoryWatermarkManagerTest implements Serializable { @SafeVarargs private final <T> CommittedBundle<T> globallyWindowedBundle(PCollection<T> pc, T... values) { - UncommittedBundle<T> bundle = InProcessBundle.unkeyed(pc); + UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pc); for (T value : values) { bundle.add(WindowedValue.valueInGlobalWindow(value)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java new file mode 100644 index 0000000..060d43c --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessBundleFactoryTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.runners.inprocess; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertThat; + +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; +import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.WithKeys; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.common.collect.ImmutableList; + +import org.hamcrest.Matcher; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; + +/** + * Tests for {@link InProcessBundleFactory}. + */ +@RunWith(JUnit4.class) +public class InProcessBundleFactoryTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + + private InProcessBundleFactory bundleFactory = InProcessBundleFactory.create(); + + private PCollection<Integer> created; + private PCollection<KV<String, Integer>> downstream; + + @Before + public void setup() { + TestPipeline p = TestPipeline.create(); + created = p.apply(Create.of(1, 2, 3)); + downstream = created.apply(WithKeys.<String, Integer>of("foo")); + } + + @Test + public void createRootBundleShouldCreateWithNullKey() { + PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1)); + + UncommittedBundle<Integer> inFlightBundle = bundleFactory.createRootBundle(pcollection); + + CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); + + assertThat(bundle.isKeyed(), is(false)); + assertThat(bundle.getKey(), nullValue()); + } + + private void createKeyedBundle(Object key) { + PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1)); + + UncommittedBundle<Integer> inFlightBundle = + bundleFactory.createKeyedBundle(null, key, pcollection); + + CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now()); + assertThat(bundle.isKeyed(), is(true)); + assertThat(bundle.getKey(), equalTo(key)); + } + + @Test + public void keyedWithNullKeyShouldCreateKeyedBundle() { + createKeyedBundle(null); + } + + @Test + public void keyedWithKeyShouldCreateKeyedBundle() { + createKeyedBundle(new Object()); + } + + private <T> void afterCommitGetElementsShouldHaveAddedElements(Iterable<WindowedValue<T>> elems) { + PCollection<T> pcollection = TestPipeline.create().apply(Create.<T>of()); + + UncommittedBundle<T> bundle = bundleFactory.createRootBundle(pcollection); + Collection<Matcher<? super WindowedValue<T>>> expectations = new ArrayList<>(); + for (WindowedValue<T> elem : elems) { + bundle.add(elem); + expectations.add(equalTo(elem)); + } + Matcher<Iterable<? extends WindowedValue<T>>> containsMatcher = + Matchers.<WindowedValue<T>>containsInAnyOrder(expectations); + assertThat(bundle.commit(Instant.now()).getElements(), containsMatcher); + } + + @Test + public void getElementsBeforeAddShouldReturnEmptyIterable() { + afterCommitGetElementsShouldHaveAddedElements(Collections.<WindowedValue<Integer>>emptyList()); + } + + @Test + public void getElementsAfterAddShouldReturnAddedElements() { + WindowedValue<Integer> firstValue = WindowedValue.valueInGlobalWindow(1); + WindowedValue<Integer> secondValue = + WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1000L)); + + afterCommitGetElementsShouldHaveAddedElements(ImmutableList.of(firstValue, secondValue)); + } + + @Test + public void addAfterCommitShouldThrowException() { + PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of()); + + UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection); + bundle.add(WindowedValue.valueInGlobalWindow(1)); + CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now()); + assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("3"); + thrown.expectMessage("committed"); + + bundle.add(WindowedValue.valueInGlobalWindow(3)); + } + + @Test + public void commitAfterCommitShouldThrowException() { + PCollection<Integer> pcollection = TestPipeline.create().apply(Create.<Integer>of()); + + UncommittedBundle<Integer> bundle = bundleFactory.createRootBundle(pcollection); + bundle.add(WindowedValue.valueInGlobalWindow(1)); + CommittedBundle<Integer> firstCommit = bundle.commit(Instant.now()); + assertThat(firstCommit.getElements(), containsInAnyOrder(WindowedValue.valueInGlobalWindow(1))); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("committed"); + + bundle.commit(Instant.now()); + } + + @Test + public void createBundleUnkeyedResultUnkeyed() { + CommittedBundle<KV<String, Integer>> newBundle = + bundleFactory + .createBundle(bundleFactory.createRootBundle(created).commit(Instant.now()), downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(false)); + } + + @Test + public void createBundleKeyedResultPropagatesKey() { + CommittedBundle<KV<String, Integer>> newBundle = + bundleFactory + .createBundle( + bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()), + downstream) + .commit(Instant.now()); + assertThat(newBundle.isKeyed(), is(true)); + assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo")); + } + + @Test + public void createRootBundleUnkeyed() { + assertThat(bundleFactory.createRootBundle(created).commit(Instant.now()).isKeyed(), is(false)); + } + + @Test + public void createKeyedBundleKeyed() { + CommittedBundle<KV<String, Integer>> keyedBundle = + bundleFactory + .createKeyedBundle( + bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream) + .commit(Instant.now()); + assertThat(keyedBundle.isKeyed(), is(true)); + assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index 564f3f2..fde2cb4 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -124,6 +124,7 @@ public class InProcessEvaluationContextTest { Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view); context = InProcessEvaluationContext.create( runner.getPipelineOptions(), + InProcessBundleFactory.create(), rootTransforms, valueToConsumers, stepNames, @@ -170,7 +171,9 @@ public class InProcessEvaluationContextTest { stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1); context.handleResult( - InProcessBundle.keyed(created, "foo").commit(Instant.now()), + InProcessBundleFactory.create() + .createKeyedBundle(null, "foo", created) + .commit(Instant.now()), ImmutableList.<TimerData>of(), StepTransformResult.withoutHold(created.getProducingTransformInternal()) .withState(stepContext.commitState()) @@ -262,7 +265,7 @@ public class InProcessEvaluationContextTest { .withCounters(againCounters) .build(); context.handleResult( - InProcessBundle.unkeyed(created).commit(Instant.now()), + context.createRootBundle(created).commit(Instant.now()), ImmutableList.<TimerData>of(), secondResult); assertThat((Long) context.getCounters().getExistingCounter("foo").getAggregate(), equalTo(12L)); @@ -289,7 +292,7 @@ public class InProcessEvaluationContextTest { .build(); context.handleResult( - InProcessBundle.keyed(created, myKey).commit(Instant.now()), + context.createKeyedBundle(null, myKey, created).commit(Instant.now()), ImmutableList.<TimerData>of(), stateResult); @@ -371,7 +374,7 @@ public class InProcessEvaluationContextTest { // haven't added any timers, must be empty assertThat(context.extractFiredTimers().entrySet(), emptyIterable()); context.handleResult( - InProcessBundle.keyed(created, key).commit(Instant.now()), + context.createKeyedBundle(null, key, created).commit(Instant.now()), ImmutableList.<TimerData>of(), timerResult); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java index 664161c..87247ac 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java @@ -69,6 +69,8 @@ import java.io.Serializable; */ @RunWith(JUnit4.class) public class ParDoMultiEvaluatorFactoryTest implements Serializable { + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testParDoMultiInMemoryTransformEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -80,26 +82,30 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { final TupleTag<Integer> lengthTag = new TupleTag<>(); BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of(new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.<String, Integer>of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag).and(lengthTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); PCollection<String> elementOutput = outputTuple.get(elementTag); PCollection<Integer> lengthOutput = outputTuple.get(lengthTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); - UncommittedBundle<Integer> lengthOutputBundle = InProcessBundle.unkeyed(lengthOutput); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); + UncommittedBundle<Integer> lengthOutputBundle = bundleFactory.createRootBundle(lengthOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -114,8 +120,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -163,24 +170,28 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { final TupleTag<Integer> lengthTag = new TupleTag<>(); BoundMulti<String, KV<String, Integer>> pardo = - ParDo.of(new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.<String, Integer>of(c.element(), c.element().length())); - c.sideOutput(elementTag, c.element()); - c.sideOutput(lengthTag, c.element().length()); - } - }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); + ParDo.of( + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.<String, Integer>of(c.element(), c.element().length())); + c.sideOutput(elementTag, c.element()); + c.sideOutput(lengthTag, c.element().length()); + } + }) + .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); PCollection<String> elementOutput = outputTuple.get(elementTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -194,8 +205,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -206,8 +218,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { InProcessTransformResult result = evaluator.finishBundle(); assertThat( result.getOutputBundles(), - Matchers.<UncommittedBundle<?>>containsInAnyOrder( - mainOutputBundle, elementOutputBundle)); + Matchers.<UncommittedBundle<?>>containsInAnyOrder(mainOutputBundle, elementOutputBundle)); assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE)); assertThat(result.getCounters(), equalTo(counters)); @@ -261,14 +272,16 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); PCollection<String> elementOutput = outputTuple.get(elementTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -282,8 +295,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -368,14 +382,16 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { .withOutputTags(mainOutputTag, TupleTagList.of(elementTag)); PCollectionTuple outputTuple = input.apply(pardo); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag); PCollection<String> elementOutput = outputTuple.get(elementTag); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); - UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); + UncommittedBundle<String> elementOutputBundle = bundleFactory.createRootBundle(elementOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); when(evaluationContext.createBundle(inputBundle, elementOutput)) @@ -389,8 +405,9 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoMultiEvaluatorFactory().forApplication( - mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoMultiEvaluatorFactory() + .forApplication( + mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/334ab99a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java index 9943fd7..704eb2a 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java @@ -66,21 +66,27 @@ import java.io.Serializable; */ @RunWith(JUnit4.class) public class ParDoSingleEvaluatorFactoryTest implements Serializable { + private transient BundleFactory bundleFactory = InProcessBundleFactory.create(); + @Test public void testParDoInMemoryTransformEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); - PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>() { - @Override public void processElement(ProcessContext c) { - c.output(c.element().length()); - } - })); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + PCollection<Integer> collection = + input.apply( + ParDo.of( + new DoFn<String, Integer>() { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().length()); + } + })); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<Integer> outputBundle = - InProcessBundle.unkeyed(collection); + UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection); when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); @@ -90,8 +96,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.createCounterSet()).thenReturn(counters); com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory().forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoSingleEvaluatorFactory() + .forApplication( + collection.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -118,16 +125,20 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam")); final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {}; - PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>() { - @Override public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element().length()); - } - })); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + PCollection<Integer> collection = + input.apply( + ParDo.of( + new DoFn<String, Integer>() { + @Override + public void processElement(ProcessContext c) { + c.sideOutput(sideOutputTag, c.element().length()); + } + })); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<Integer> outputBundle = - InProcessBundle.unkeyed(collection); + UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection); when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle); InProcessExecutionContext executionContext = new InProcessExecutionContext(null, null, null, null); @@ -137,8 +148,9 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { when(evaluationContext.createCounterSet()).thenReturn(counters); TransformEvaluator<String> evaluator = - new ParDoSingleEvaluatorFactory().forApplication( - collection.getProducingTransformInternal(), inputBundle, evaluationContext); + new ParDoSingleEvaluatorFactory() + .forApplication( + collection.getProducingTransformInternal(), inputBundle, evaluationContext); evaluator.processElement(WindowedValue.valueInGlobalWindow("foo")); evaluator.processElement( @@ -183,10 +195,12 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { }); PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); @@ -246,42 +260,44 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { ParDo.Bound<String, KV<String, Integer>> pardo = ParDo.of( - new DoFn<String, KV<String, Integer>>() { - @Override - public void processElement(ProcessContext c) { - c.windowingInternals().stateInternals(); - c.windowingInternals() - .timerInternals() - .setTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0).plus(Duration.standardMinutes(5)), - new Instant(1) - .plus(Duration.standardMinutes(5)) - .plus(Duration.standardHours(1)))), - new Instant(54541L), - TimeDomain.EVENT_TIME)); - c.windowingInternals() - .timerInternals() - .deleteTimer( - TimerData.of( - StateNamespaces.window( - IntervalWindow.getCoder(), - new IntervalWindow( - new Instant(0), - new Instant(0).plus(Duration.standardHours(1)))), - new Instant(3400000), - TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); - } - }); + new DoFn<String, KV<String, Integer>>() { + @Override + public void processElement(ProcessContext c) { + c.windowingInternals().stateInternals(); + c.windowingInternals() + .timerInternals() + .setTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0).plus(Duration.standardMinutes(5)), + new Instant(1) + .plus(Duration.standardMinutes(5)) + .plus(Duration.standardHours(1)))), + new Instant(54541L), + TimeDomain.EVENT_TIME)); + c.windowingInternals() + .timerInternals() + .deleteTimer( + TimerData.of( + StateNamespaces.window( + IntervalWindow.getCoder(), + new IntervalWindow( + new Instant(0), + new Instant(0).plus(Duration.standardHours(1)))), + new Instant(3400000), + TimeDomain.SYNCHRONIZED_PROCESSING_TIME)); + } + }); PCollection<KV<String, Integer>> mainOutput = input.apply(pardo); - CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now()); + CommittedBundle<String> inputBundle = + bundleFactory.createRootBundle(input).commit(Instant.now()); InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class); - UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput); + UncommittedBundle<KV<String, Integer>> mainOutputBundle = + bundleFactory.createRootBundle(mainOutput); when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle); @@ -303,10 +319,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable { assertThat( result.getTimerUpdate(), equalTo( - TimerUpdate.builder("myKey") - .setTimer(addedTimer) - .deletedTimer(deletedTimer) - .build())); + TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build())); } }
