Add DirectGraphs to DirectRunner Tests Add getGraph(Pipeline) and getProducer(PValue), which use the DirectGraphVisitor and DirectGraph methods to provide access to the producing AppliedPTransform.
Remove getProducingTransformInternal from everywhere except DirectGraphVisitorTest Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6c6ad37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6c6ad37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6c6ad37 Branch: refs/heads/master Commit: d6c6ad37149622e4d35af39727cdf774e6263d1e Parents: 077d911 Author: Thomas Groh <tg...@google.com> Authored: Fri Dec 2 10:56:36 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Tue Dec 6 10:46:39 2016 -0800 ---------------------------------------------------------------------- .../direct/BoundedReadEvaluatorFactoryTest.java | 18 +- .../runners/direct/DirectGraphVisitorTest.java | 1 + .../beam/runners/direct/DirectGraphs.java | 35 +++ .../runners/direct/EvaluationContextTest.java | 82 ++++--- .../direct/FlattenEvaluatorFactoryTest.java | 15 +- .../direct/GroupByKeyEvaluatorFactoryTest.java | 2 +- .../GroupByKeyOnlyEvaluatorFactoryTest.java | 3 +- .../ImmutabilityEnforcementFactoryTest.java | 2 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 3 +- .../StatefulParDoEvaluatorFactoryTest.java | 4 +- .../runners/direct/StepTransformResultTest.java | 2 +- .../direct/TestStreamEvaluatorFactoryTest.java | 14 +- .../runners/direct/TransformExecutorTest.java | 9 +- .../UnboundedReadEvaluatorFactoryTest.java | 24 +- .../direct/ViewEvaluatorFactoryTest.java | 4 +- .../direct/WatermarkCallbackExecutorTest.java | 6 +- .../runners/direct/WatermarkManagerTest.java | 237 ++++++++----------- 17 files changed, 246 insertions(+), 215 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java index b1ff689..acb1444 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java @@ -80,6 +80,7 @@ public class BoundedReadEvaluatorFactoryTest { private BoundedReadEvaluatorFactory factory; @Mock private EvaluationContext context; private BundleFactory bundleFactory; + private AppliedPTransform<?, ?, ?> longsProducer; @Before public void setup() { @@ -92,6 +93,7 @@ public class BoundedReadEvaluatorFactoryTest { new BoundedReadEvaluatorFactory( context, Long.MAX_VALUE /* minimum size for dynamic splits */); bundleFactory = ImmutableListBundleFactory.create(); + longsProducer = DirectGraphs.getProducer(longs); } @Test @@ -102,11 +104,11 @@ public class BoundedReadEvaluatorFactoryTest { Collection<CommittedBundle<?>> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), 1); + .getInitialInputs(longsProducer, 1); List<WindowedValue<?>> outputs = new ArrayList<>(); for (CommittedBundle<?> shardBundle : initialInputs) { TransformEvaluator<?> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), null); + factory.forApplication(longsProducer, null); for (WindowedValue<?> shard : shardBundle.getElements()) { evaluator.processElement((WindowedValue) shard); } @@ -141,7 +143,7 @@ public class BoundedReadEvaluatorFactoryTest { } PCollection<Long> read = TestPipeline.create().apply(Read.from(new TestSource<>(VarLongCoder.of(), 5, elems))); - AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal(); + AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read); Collection<CommittedBundle<?>> unreadInputs = new BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 1); @@ -191,7 +193,7 @@ public class BoundedReadEvaluatorFactoryTest { PCollection<Long> read = TestPipeline.create() .apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L)))); - AppliedPTransform<?, ?, ?> transform = read.getProducingTransformInternal(); + AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); @@ -238,7 +240,7 @@ public class BoundedReadEvaluatorFactoryTest { }); Collection<CommittedBundle<?>> initialInputs = new BoundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), 3); + .getInitialInputs(longsProducer, 3); assertThat(initialInputs, hasSize(allOf(greaterThanOrEqualTo(3), lessThanOrEqualTo(4)))); @@ -271,7 +273,7 @@ public class BoundedReadEvaluatorFactoryTest { CommittedBundle<BoundedSourceShard<Long>> shards = rootBundle.commit(Instant.now()); TransformEvaluator<BoundedSourceShard<Long>> evaluator = - factory.forApplication(longs.getProducingTransformInternal(), shards); + factory.forApplication(longsProducer, shards); for (WindowedValue<BoundedSourceShard<Long>> shard : shards.getElements()) { UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(longs); when(context.createBundle(longs)).thenReturn(outputBundle); @@ -299,7 +301,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection); UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); @@ -320,7 +322,7 @@ public class BoundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection); UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); when(context.createBundle(pcollection)).thenReturn(output); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java index d218a81..fb84de8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -48,6 +48,7 @@ import org.junit.runners.JUnit4; /** * Tests for {@link DirectGraphVisitor}. */ +// TODO: Replace uses of getProducing @RunWith(JUnit4.class) public class DirectGraphVisitorTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java new file mode 100644 index 0000000..73ada19 --- /dev/null +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.direct; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.values.PValue; + +/** Test utilities for the {@link DirectRunner}. */ +final class DirectGraphs { + public static DirectGraph getGraph(Pipeline p) { + DirectGraphVisitor visitor = new DirectGraphVisitor(); + p.traverseTopologically(visitor); + return visitor.getGraph(); + } + + public static AppliedPTransform<?, ?, ?> getProducer(PValue value) { + return getGraph(value.getPipeline()).getProducer(value); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 17cdea1..a2bb15e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -86,9 +86,13 @@ public class EvaluationContextTest { private PCollectionView<Iterable<Integer>> view; private PCollection<Long> unbounded; - private BundleFactory bundleFactory; private DirectGraph graph; + private AppliedPTransform<?, ?, ?> createdProducer; + private AppliedPTransform<?, ?, ?> downstreamProducer; + private AppliedPTransform<?, ?, ?> viewProducer; + private AppliedPTransform<?, ?, ?> unboundedProducer; + @Before public void setup() { DirectRunner runner = @@ -101,14 +105,16 @@ public class EvaluationContextTest { view = created.apply(View.<Integer>asIterable()); unbounded = p.apply(CountingInput.unbounded()); - DirectGraphVisitor graphVisitor = new DirectGraphVisitor(); - p.traverseTopologically(graphVisitor); - - bundleFactory = ImmutableListBundleFactory.create(); - graph = graphVisitor.getGraph(); + BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + graph = DirectGraphs.getGraph(p); context = EvaluationContext.create( runner.getPipelineOptions(), NanosOffsetClock.create(), bundleFactory, graph); + + createdProducer = graph.getProducer(created); + downstreamProducer = graph.getProducer(downstream); + viewProducer = graph.getProducer(view); + unboundedProducer = graph.getProducer(unbounded); } @Test @@ -146,7 +152,7 @@ public class EvaluationContextTest { @Test public void getExecutionContextSameStepSameKeyState() { DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -159,12 +165,12 @@ public class EvaluationContextTest { .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), created) .commit(Instant.now()), ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()) + StepTransformResult.withoutHold(createdProducer) .withState(stepContext.commitState()) .build()); DirectExecutionContext secondFooContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); assertThat( secondFooContext @@ -179,7 +185,7 @@ public class EvaluationContextTest { @Test public void getExecutionContextDifferentKeysIndependentState() { DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("foo", StringUtf8Coder.of())); StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -191,7 +197,7 @@ public class EvaluationContextTest { .add(1); DirectExecutionContext barContext = - context.getExecutionContext(created.getProducingTransformInternal(), + context.getExecutionContext(createdProducer, StructuralKey.of("bar", StringUtf8Coder.of())); assertThat(barContext, not(equalTo(fooContext))); assertThat( @@ -207,7 +213,7 @@ public class EvaluationContextTest { public void getExecutionContextDifferentStepsIndependentState() { StructuralKey<?> myKey = StructuralKey.of("foo", StringUtf8Coder.of()); DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), myKey); + context.getExecutionContext(createdProducer, myKey); StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -218,7 +224,7 @@ public class EvaluationContextTest { .add(1); DirectExecutionContext barContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + context.getExecutionContext(downstreamProducer, myKey); assertThat( barContext .getOrCreateStepContext("s1", "s1") @@ -232,15 +238,15 @@ public class EvaluationContextTest { public void handleResultCommitsAggregators() { Class<?> fn = getClass(); DirectExecutionContext fooContext = - context.getExecutionContext(created.getProducingTransformInternal(), null); + context.getExecutionContext(createdProducer, null); DirectExecutionContext.StepContext stepContext = fooContext.createStepContext( - "STEP", created.getProducingTransformInternal().getTransform().getName()); + "STEP", createdProducer.getTransform().getName()); AggregatorContainer container = context.getAggregatorContainer(); AggregatorContainer.Mutator mutator = container.createMutator(); mutator.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(4L); TransformResult<?> result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()) + StepTransformResult.withoutHold(createdProducer) .withAggregatorChanges(mutator) .build(); context.handleResult(null, ImmutableList.<TimerData>of(), result); @@ -250,7 +256,7 @@ public class EvaluationContextTest { mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new SumLongFn()).addValue(12L); TransformResult<?> secondResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + StepTransformResult.withoutHold(downstreamProducer) .withAggregatorChanges(mutatorAgain) .build(); context.handleResult( @@ -264,7 +270,7 @@ public class EvaluationContextTest { public void handleResultStoresState() { StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of()); DirectExecutionContext fooContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + context.getExecutionContext(downstreamProducer, myKey); StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of()); @@ -276,7 +282,7 @@ public class EvaluationContextTest { bag.add(4); TransformResult<?> stateResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + StepTransformResult.withoutHold(downstreamProducer) .withState(state) .build(); @@ -286,7 +292,7 @@ public class EvaluationContextTest { stateResult); DirectExecutionContext afterResultContext = - context.getExecutionContext(downstream.getProducingTransformInternal(), myKey); + context.getExecutionContext(downstreamProducer, myKey); CopyOnAccessInMemoryStateInternals<Object> afterResultState = afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals(); @@ -309,7 +315,7 @@ public class EvaluationContextTest { downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), callback); TransformResult<?> result = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + StepTransformResult.withHold(createdProducer, new Instant(0)) .build(); context.handleResult(null, ImmutableList.<TimerData>of(), result); @@ -318,7 +324,7 @@ public class EvaluationContextTest { assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false)); TransformResult<?> finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); context.forceRefresh(); // Obtain the value via blocking call @@ -328,7 +334,7 @@ public class EvaluationContextTest { @Test public void callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws Exception { TransformResult<?> finishedResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult); final CountDownLatch callLatch = new CountDownLatch(1); @@ -348,7 +354,7 @@ public class EvaluationContextTest { @Test public void extractFiredTimersExtractsTimers() { TransformResult<?> holdResult = - StepTransformResult.withHold(created.getProducingTransformInternal(), new Instant(0)) + StepTransformResult.withHold(createdProducer, new Instant(0)) .build(); context.handleResult(null, ImmutableList.<TimerData>of(), holdResult); @@ -356,7 +362,7 @@ public class EvaluationContextTest { TimerData toFire = TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME); TransformResult<?> timerResult = - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()) + StepTransformResult.withoutHold(downstreamProducer) .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, null)) .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build()) .build(); @@ -372,7 +378,7 @@ public class EvaluationContextTest { assertThat(context.extractFiredTimers(), emptyIterable()); TransformResult<?> advanceResult = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); // Should cause the downstream timer to fire context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult); @@ -403,14 +409,14 @@ public class EvaluationContextTest { @Test public void isDoneWithUnboundedPCollectionAndShutdown() { context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false)); + assertThat(context.isDone(unboundedProducer), is(false)); context.handleResult( null, ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(unboundedProducer).build()); context.extractFiredTimers(); - assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true)); + assertThat(context.isDone(unboundedProducer), is(true)); } @Test @@ -428,14 +434,14 @@ public class EvaluationContextTest { @Test public void isDoneWithOnlyBoundedPCollections() { context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false); - assertThat(context.isDone(created.getProducingTransformInternal()), is(false)); + assertThat(context.isDone(createdProducer), is(false)); context.handleResult( null, ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(createdProducer).build()); context.extractFiredTimers(); - assertThat(context.isDone(created.getProducingTransformInternal()), is(true)); + assertThat(context.isDone(createdProducer), is(true)); } @Test @@ -449,7 +455,7 @@ public class EvaluationContextTest { context.handleResult( null, ImmutableList.<TimerData>of(), - StepTransformResult.<Integer>withoutHold(created.getProducingTransformInternal()) + StepTransformResult.<Integer>withoutHold(createdProducer) .addOutput(rootBundle) .build()); @SuppressWarnings("unchecked") @@ -458,7 +464,7 @@ public class EvaluationContextTest { context.handleResult( null, ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(unboundedProducer).build()); assertThat(context.isDone(), is(false)); for (AppliedPTransform<?, ?, ?> consumers : graph.getPrimitiveConsumers(created)) { @@ -479,22 +485,22 @@ public class EvaluationContextTest { context.handleResult( null, ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(createdProducer).build()); context.handleResult( null, ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(unboundedProducer).build()); context.handleResult( context.createBundle(created).commit(Instant.now()), ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(downstreamProducer).build()); context.extractFiredTimers(); assertThat(context.isDone(), is(false)); context.handleResult( context.createBundle(created).commit(Instant.now()), ImmutableList.<TimerData>of(), - StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + StepTransformResult.withoutHold(viewProducer).build()); context.extractFiredTimers(); assertThat(context.isDone(), is(false)); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java index cb27fbc..9e22c36 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java @@ -47,6 +47,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class FlattenEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); + @Test public void testFlattenInMemoryEvaluator() throws Exception { TestPipeline p = TestPipeline.create(); @@ -69,10 +70,11 @@ public class FlattenEvaluatorFactoryTest { when(context.createBundle(flattened)).thenReturn(flattenedLeftBundle, flattenedRightBundle); FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context); + AppliedPTransform<?, ?, ?> flattenedProducer = DirectGraphs.getProducer(flattened); TransformEvaluator<Integer> leftSideEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), leftBundle); + factory.forApplication(flattenedProducer, leftBundle); TransformEvaluator<Integer> rightSideEvaluator = - factory.forApplication(flattened.getProducingTransformInternal(), rightBundle); + factory.forApplication(flattenedProducer, rightBundle); leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1)); rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1)); @@ -92,13 +94,13 @@ public class FlattenEvaluatorFactoryTest { Matchers.<UncommittedBundle<?>>contains(flattenedRightBundle)); assertThat( rightSideResult.getTransform(), - Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal())); + Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer)); assertThat( leftSideResult.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(flattenedLeftBundle)); assertThat( leftSideResult.getTransform(), - Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal())); + Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer)); assertThat( flattenedLeftBundle.commit(Instant.now()).getElements(), @@ -126,9 +128,10 @@ public class FlattenEvaluatorFactoryTest { .thenReturn(bundleFactory.createBundle(flattened)); FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(evaluationContext); + AppliedPTransform<?, ?, ?> flattendProducer = DirectGraphs.getProducer(flattened); TransformEvaluator<Integer> emptyEvaluator = factory.forApplication( - flattened.getProducingTransformInternal(), + flattendProducer, bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE)); TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle(); @@ -138,7 +141,7 @@ public class FlattenEvaluatorFactoryTest { assertThat(outputBundle.getElements(), emptyIterable()); assertThat( leftSideResult.getTransform(), - Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattened.getProducingTransformInternal())); + Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattendProducer)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java index 7ba38ce..f0b29f0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java @@ -97,7 +97,7 @@ public class GroupByKeyEvaluatorFactoryTest { ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder(); TransformEvaluator<KV<String, Integer>> evaluator = new GroupByKeyOnlyEvaluatorFactory(evaluationContext) - .forApplication(groupedKvs.getProducingTransformInternal(), inputBundle); + .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo)); evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java index 23340c6..7efdb3d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java @@ -90,8 +90,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest { ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder(); TransformEvaluator<KV<String, Integer>> evaluator = new GroupByKeyOnlyEvaluatorFactory(evaluationContext) - .forApplication( - groupedKvs.getProducingTransformInternal(), inputBundle); + .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle); evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo)); evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java index a65cd30..1ad6ba6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java @@ -64,7 +64,7 @@ public class ImmutabilityEnforcementFactoryTest implements Serializable { c.element()[0] = 'b'; } })); - consumer = pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal(); + consumer = DirectGraphs.getProducer(pcollection.apply(Count.<byte[]>globally())); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 85e99c5..d48ac14 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -152,8 +152,9 @@ public class ParDoEvaluatorTest { when(evaluationContext.getAggregatorContainer()).thenReturn(container); when(evaluationContext.getAggregatorMutator()).thenReturn(mutator); + @SuppressWarnings("unchecked") AppliedPTransform<PCollection<Integer>, ?, ?> transform = - (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(); + (AppliedPTransform<PCollection<Integer>, ?, ?>) DirectGraphs.getProducer(output); return ParDoEvaluator.create( evaluationContext, stepContext, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index ecf11ed..06c85ef 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -129,7 +129,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { AppliedPTransform< PCollection<? extends KV<String, Iterable<Integer>>>, PCollectionTuple, StatefulParDo<String, Integer, Integer>> - producingTransform = (AppliedPTransform) produced.getProducingTransformInternal(); + producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced); // Then there will be a digging down to the step context to get the state internals when(mockEvaluationContext.getExecutionContext( @@ -239,7 +239,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { AppliedPTransform< PCollection<KV<String, Iterable<Integer>>>, PCollectionTuple, StatefulParDo<String, Integer, Integer>> - producingTransform = (AppliedPTransform) produced.getProducingTransformInternal(); + producingTransform = (AppliedPTransform) DirectGraphs.getProducer(produced); // Then there will be a digging down to the step context to get the state internals when(mockEvaluationContext.getExecutionContext( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java index a21d8f7..d3a2cca 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java @@ -48,7 +48,7 @@ public class StepTransformResultTest { public void setup() { TestPipeline p = TestPipeline.create(); pc = p.apply(Create.of(1, 2, 3)); - transform = pc.getProducingTransformInternal(); + transform = DirectGraphs.getGraph(p).getProducer(pc); bundleFactory = ImmutableListBundleFactory.create(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java index 3d31df6..6bb8623 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java @@ -32,6 +32,7 @@ import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.TestStreamIndex import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; @@ -80,15 +81,16 @@ public class TestStreamEvaluatorFactoryTest { when(context.createBundle(streamVals)) .thenReturn(bundleFactory.createBundle(streamVals), bundleFactory.createBundle(streamVals)); + AppliedPTransform<?, ?, ?> streamProducer = DirectGraphs.getProducer(streamVals); Collection<CommittedBundle<?>> initialInputs = new TestStreamEvaluatorFactory.InputProvider(context) - .getInitialInputs(streamVals.getProducingTransformInternal(), 1); + .getInitialInputs(streamProducer, 1); @SuppressWarnings("unchecked") CommittedBundle<TestStreamIndex<Integer>> initialBundle = (CommittedBundle<TestStreamIndex<Integer>>) Iterables.getOnlyElement(initialInputs); TransformEvaluator<TestStreamIndex<Integer>> firstEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), initialBundle); + factory.forApplication(streamProducer, initialBundle); firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements())); TransformResult<TestStreamIndex<Integer>> firstResult = firstEvaluator.finishBundle(); @@ -101,7 +103,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle<TestStreamIndex<Integer>> secondBundle = initialBundle.withElements(Collections.singleton(firstResidual)); TransformEvaluator<TestStreamIndex<Integer>> secondEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), secondBundle); + factory.forApplication(streamProducer, secondBundle); secondEvaluator.processElement(firstResidual); TransformResult<TestStreamIndex<Integer>> secondResult = secondEvaluator.finishBundle(); @@ -114,7 +116,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle<TestStreamIndex<Integer>> thirdBundle = secondBundle.withElements(Collections.singleton(secondResidual)); TransformEvaluator<TestStreamIndex<Integer>> thirdEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), thirdBundle); + factory.forApplication(streamProducer, thirdBundle); thirdEvaluator.processElement(secondResidual); TransformResult<TestStreamIndex<Integer>> thirdResult = thirdEvaluator.finishBundle(); @@ -128,7 +130,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle<TestStreamIndex<Integer>> fourthBundle = thirdBundle.withElements(Collections.singleton(thirdResidual)); TransformEvaluator<TestStreamIndex<Integer>> fourthEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), fourthBundle); + factory.forApplication(streamProducer, fourthBundle); fourthEvaluator.processElement(thirdResidual); TransformResult<TestStreamIndex<Integer>> fourthResult = fourthEvaluator.finishBundle(); @@ -142,7 +144,7 @@ public class TestStreamEvaluatorFactoryTest { CommittedBundle<TestStreamIndex<Integer>> fifthBundle = thirdBundle.withElements(Collections.singleton(fourthResidual)); TransformEvaluator<TestStreamIndex<Integer>> fifthEvaluator = - factory.forApplication(streamVals.getProducingTransformInternal(), fifthBundle); + factory.forApplication(streamProducer, fifthBundle); fifthEvaluator.processElement(fourthResidual); TransformResult<TestStreamIndex<Integer>> fifthResult = fifthEvaluator.finishBundle(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java index 08b1e18..4ad22bc 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java @@ -89,8 +89,9 @@ public class TransformExecutorTest { created = p.apply(Create.of("foo", "spam", "third")); PCollection<KV<Integer, String>> downstream = created.apply(WithKeys.<Integer, String>of(3)); - createdProducer = created.getProducingTransformInternal(); - downstreamProducer = downstream.getProducingTransformInternal(); + DirectGraph graph = DirectGraphs.getGraph(p); + createdProducer = graph.getProducer(created); + downstreamProducer = graph.getProducer(downstream); when(evaluationContext.getMetrics()).thenReturn(metrics); } @@ -317,7 +318,7 @@ public class TransformExecutorTest { @Test public void callWithEnforcementThrowsOnFinishPropagates() throws Exception { final TransformResult<Object> result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); TransformEvaluator<Object> evaluator = new TransformEvaluator<Object>() { @@ -356,7 +357,7 @@ public class TransformExecutorTest { @Test public void callWithEnforcementThrowsOnElementPropagates() throws Exception { final TransformResult<Object> result = - StepTransformResult.withoutHold(created.getProducingTransformInternal()).build(); + StepTransformResult.withoutHold(createdProducer).build(); TransformEvaluator<Object> evaluator = new TransformEvaluator<Object>() { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java index 5a10134..dd36a2f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.runners.direct.DirectGraphs.getProducer; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -90,6 +91,7 @@ public class UnboundedReadEvaluatorFactoryTest { private BundleFactory bundleFactory = ImmutableListBundleFactory.create(); private UnboundedSource<Long, ?> source; + private DirectGraph graph; @Before public void setup() { @@ -100,6 +102,7 @@ public class UnboundedReadEvaluatorFactoryTest { context = mock(EvaluationContext.class); factory = new UnboundedReadEvaluatorFactory(context); output = bundleFactory.createBundle(longs); + graph = DirectGraphs.getGraph(p); when(context.createBundle(longs)).thenReturn(output); } @@ -115,7 +118,7 @@ public class UnboundedReadEvaluatorFactoryTest { int numSplits = 5; Collection<CommittedBundle<?>> initialInputs = new UnboundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), numSplits); + .getInitialInputs(graph.getProducer(longs), numSplits); // CountingSource.unbounded has very good splitting behavior assertThat(initialInputs, hasSize(numSplits)); @@ -148,15 +151,14 @@ public class UnboundedReadEvaluatorFactoryTest { Collection<CommittedBundle<?>> initialInputs = new UnboundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(longs.getProducingTransformInternal(), 1); + .getInitialInputs(graph.getProducer(longs), 1); CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs); UnboundedSourceShard<Long, ?> inputShard = (UnboundedSourceShard<Long, ?>) Iterables.getOnlyElement(inputShards.getElements()).getValue(); TransformEvaluator<? super UnboundedSourceShard<Long, ?>> evaluator = - factory.forApplication( - longs.getProducingTransformInternal(), inputShards); + factory.forApplication(graph.getProducer(longs), inputShards); evaluator.processElement((WindowedValue) Iterables.getOnlyElement(inputShards.getElements())); TransformResult<? super UnboundedSourceShard<Long, ?>> result = evaluator.finishBundle(); @@ -190,7 +192,7 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); Collection<CommittedBundle<?>> initialInputs = @@ -233,7 +235,7 @@ public class UnboundedReadEvaluatorFactoryTest { // Read with a very slow rate so by the second read there are no more elements PCollection<Long> pcollection = p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L))); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform<?, ?, ?> sourceTransform = DirectGraphs.getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); Collection<CommittedBundle<?>> initialInputs = @@ -291,7 +293,9 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + DirectGraph graph = DirectGraphs.getGraph(p); + AppliedPTransform<?, ?, ?> sourceTransform = + graph.getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); @@ -307,8 +311,7 @@ public class UnboundedReadEvaluatorFactoryTest { .commit(Instant.now()); UnboundedReadEvaluatorFactory factory = new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */); - new UnboundedReadEvaluatorFactory.InputProvider(context) - .getInitialInputs(pcollection.getProducingTransformInternal(), 1); + new UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform, 1); TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> evaluator = factory.forApplication(sourceTransform, inputBundle); evaluator.processElement(shard); @@ -336,7 +339,8 @@ public class UnboundedReadEvaluatorFactoryTest { TestPipeline p = TestPipeline.create(); PCollection<Long> pcollection = p.apply(Read.from(source)); - AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal(); + AppliedPTransform<?, ?, ?> sourceTransform = + DirectGraphs.getGraph(p).getProducer(pcollection); when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle()); UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index 7d14020..7c08009 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Values; @@ -73,9 +74,10 @@ public class ViewEvaluatorFactoryTest { CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now()); + AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view); TransformEvaluator<Iterable<String>> evaluator = new ViewEvaluatorFactory(context) - .forApplication(view.getProducingTransformInternal(), inputBundle); + .forApplication(producer, inputBundle); evaluator.processElement( WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar"))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java index 1be9a98..acdabb6 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java @@ -55,8 +55,10 @@ public class WatermarkCallbackExecutorTest { public void setup() { TestPipeline p = TestPipeline.create(); PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); - create = created.getProducingTransformInternal(); - sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal(); + PCollection<Integer> summed = created.apply(Sum.integersGlobally()); + DirectGraph graph = DirectGraphs.getGraph(p); + create = graph.getProducer(created); + sum = graph.getProducer(summed); } @Test