Add DirectGraphs to DirectRunner Tests

Add getGraph(Pipeline) and getProducer(PValue), which use the
DirectGraphVisitor and DirectGraph methods to provide access to the
producing AppliedPTransform.

Remove getProducingTransformInternal from everywhere except
DirectGraphVisitorTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6c6ad37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6c6ad37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6c6ad37

Branch: refs/heads/master
Commit: d6c6ad37149622e4d35af39727cdf774e6263d1e
Parents: 077d911
Author: Thomas Groh <tg...@google.com>
Authored: Fri Dec 2 10:56:36 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Dec 6 10:46:39 2016 -0800

----------------------------------------------------------------------
 .../direct/BoundedReadEvaluatorFactoryTest.java |  18 +-
 .../runners/direct/DirectGraphVisitorTest.java  |   1 +
 .../beam/runners/direct/DirectGraphs.java       |  35 +++
 .../runners/direct/EvaluationContextTest.java   |  82 ++++---
 .../direct/FlattenEvaluatorFactoryTest.java     |  15 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   2 +-
 .../GroupByKeyOnlyEvaluatorFactoryTest.java     |   3 +-
 .../ImmutabilityEnforcementFactoryTest.java     |   2 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   3 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |   4 +-
 .../runners/direct/StepTransformResultTest.java |   2 +-
 .../direct/TestStreamEvaluatorFactoryTest.java  |  14 +-
 .../runners/direct/TransformExecutorTest.java   |   9 +-
 .../UnboundedReadEvaluatorFactoryTest.java      |  24 +-
 .../direct/ViewEvaluatorFactoryTest.java        |   4 +-
 .../direct/WatermarkCallbackExecutorTest.java   |   6 +-
 .../runners/direct/WatermarkManagerTest.java    | 237 ++++++++-----------
 17 files changed, 246 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
index b1ff689..acb1444 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactoryTest.java
@@ -80,6 +80,7 @@ public class BoundedReadEvaluatorFactoryTest {
   private BoundedReadEvaluatorFactory factory;
   @Mock private EvaluationContext context;
   private BundleFactory bundleFactory;
+  private AppliedPTransform<?, ?, ?> longsProducer;
 
   @Before
   public void setup() {
@@ -92,6 +93,7 @@ public class BoundedReadEvaluatorFactoryTest {
         new BoundedReadEvaluatorFactory(
             context, Long.MAX_VALUE /* minimum size for dynamic splits */);
     bundleFactory = ImmutableListBundleFactory.create();
+    longsProducer = DirectGraphs.getProducer(longs);
   }
 
   @Test
@@ -102,11 +104,11 @@ public class BoundedReadEvaluatorFactoryTest {
 
     Collection<CommittedBundle<?>> initialInputs =
         new BoundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), 1);
+            .getInitialInputs(longsProducer, 1);
     List<WindowedValue<?>> outputs = new ArrayList<>();
     for (CommittedBundle<?> shardBundle : initialInputs) {
       TransformEvaluator<?> evaluator =
-          factory.forApplication(longs.getProducingTransformInternal(), null);
+          factory.forApplication(longsProducer, null);
       for (WindowedValue<?> shard : shardBundle.getElements()) {
         evaluator.processElement((WindowedValue) shard);
       }
@@ -141,7 +143,7 @@ public class BoundedReadEvaluatorFactoryTest {
     }
     PCollection<Long> read =
         TestPipeline.create().apply(Read.from(new 
TestSource<>(VarLongCoder.of(), 5, elems)));
-    AppliedPTransform<?, ?, ?> transform = 
read.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
     Collection<CommittedBundle<?>> unreadInputs =
         new 
BoundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(transform, 
1);
 
@@ -191,7 +193,7 @@ public class BoundedReadEvaluatorFactoryTest {
     PCollection<Long> read =
         TestPipeline.create()
             
.apply(Read.from(SourceTestUtils.toUnsplittableSource(CountingSource.upTo(10L))));
-    AppliedPTransform<?, ?, ?> transform = 
read.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> transform = DirectGraphs.getProducer(read);
 
     
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
@@ -238,7 +240,7 @@ public class BoundedReadEvaluatorFactoryTest {
             });
     Collection<CommittedBundle<?>> initialInputs =
         new BoundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), 3);
+            .getInitialInputs(longsProducer, 3);
 
     assertThat(initialInputs, hasSize(allOf(greaterThanOrEqualTo(3), 
lessThanOrEqualTo(4))));
 
@@ -271,7 +273,7 @@ public class BoundedReadEvaluatorFactoryTest {
     CommittedBundle<BoundedSourceShard<Long>> shards = 
rootBundle.commit(Instant.now());
 
     TransformEvaluator<BoundedSourceShard<Long>> evaluator =
-        factory.forApplication(longs.getProducingTransformInternal(), shards);
+        factory.forApplication(longsProducer, shards);
     for (WindowedValue<BoundedSourceShard<Long>> shard : shards.getElements()) 
{
       UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(longs);
       when(context.createBundle(longs)).thenReturn(outputBundle);
@@ -299,7 +301,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = 
DirectGraphs.getProducer(pcollection);
 
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);
@@ -320,7 +322,7 @@ public class BoundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = 
DirectGraphs.getProducer(pcollection);
 
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
     when(context.createBundle(pcollection)).thenReturn(output);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index d218a81..fb84de8 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -48,6 +48,7 @@ import org.junit.runners.JUnit4;
 /**
  * Tests for {@link DirectGraphVisitor}.
  */
+// TODO: Replace uses of getProducing
 @RunWith(JUnit4.class)
 public class DirectGraphVisitorTest implements Serializable {
   @Rule public transient ExpectedException thrown = ExpectedException.none();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
new file mode 100644
index 0000000..73ada19
--- /dev/null
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.values.PValue;
+
+/** Test utilities for the {@link DirectRunner}. */
+final class DirectGraphs {
+  public static DirectGraph getGraph(Pipeline p) {
+    DirectGraphVisitor visitor = new DirectGraphVisitor();
+    p.traverseTopologically(visitor);
+    return visitor.getGraph();
+  }
+
+  public static AppliedPTransform<?, ?, ?> getProducer(PValue value) {
+    return getGraph(value.getPipeline()).getProducer(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 17cdea1..a2bb15e 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -86,9 +86,13 @@ public class EvaluationContextTest {
   private PCollectionView<Iterable<Integer>> view;
   private PCollection<Long> unbounded;
 
-  private BundleFactory bundleFactory;
   private DirectGraph graph;
 
+  private AppliedPTransform<?, ?, ?> createdProducer;
+  private AppliedPTransform<?, ?, ?> downstreamProducer;
+  private AppliedPTransform<?, ?, ?> viewProducer;
+  private AppliedPTransform<?, ?, ?> unboundedProducer;
+
   @Before
   public void setup() {
     DirectRunner runner =
@@ -101,14 +105,16 @@ public class EvaluationContextTest {
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(CountingInput.unbounded());
 
-    DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
-    p.traverseTopologically(graphVisitor);
-
-    bundleFactory = ImmutableListBundleFactory.create();
-    graph = graphVisitor.getGraph();
+    BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+    graph = DirectGraphs.getGraph(p);
     context =
         EvaluationContext.create(
             runner.getPipelineOptions(), NanosOffsetClock.create(), 
bundleFactory, graph);
+
+    createdProducer = graph.getProducer(created);
+    downstreamProducer = graph.getProducer(downstream);
+    viewProducer = graph.getProducer(view);
+    unboundedProducer = graph.getProducer(unbounded);
   }
 
   @Test
@@ -146,7 +152,7 @@ public class EvaluationContextTest {
   @Test
   public void getExecutionContextSameStepSameKeyState() {
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
@@ -159,12 +165,12 @@ public class EvaluationContextTest {
             .createKeyedBundle(StructuralKey.of("foo", StringUtf8Coder.of()), 
created)
             .commit(Instant.now()),
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal())
+        StepTransformResult.withoutHold(createdProducer)
             .withState(stepContext.commitState())
             .build());
 
     DirectExecutionContext secondFooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
     assertThat(
         secondFooContext
@@ -179,7 +185,7 @@ public class EvaluationContextTest {
   @Test
   public void getExecutionContextDifferentKeysIndependentState() {
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("foo", StringUtf8Coder.of()));
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
@@ -191,7 +197,7 @@ public class EvaluationContextTest {
         .add(1);
 
     DirectExecutionContext barContext =
-        context.getExecutionContext(created.getProducingTransformInternal(),
+        context.getExecutionContext(createdProducer,
             StructuralKey.of("bar", StringUtf8Coder.of()));
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
@@ -207,7 +213,7 @@ public class EvaluationContextTest {
   public void getExecutionContextDifferentStepsIndependentState() {
     StructuralKey<?> myKey = StructuralKey.of("foo", StringUtf8Coder.of());
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), 
myKey);
+        context.getExecutionContext(createdProducer, myKey);
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
 
@@ -218,7 +224,7 @@ public class EvaluationContextTest {
         .add(1);
 
     DirectExecutionContext barContext =
-        
context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+        context.getExecutionContext(downstreamProducer, myKey);
     assertThat(
         barContext
             .getOrCreateStepContext("s1", "s1")
@@ -232,15 +238,15 @@ public class EvaluationContextTest {
   public void handleResultCommitsAggregators() {
     Class<?> fn = getClass();
     DirectExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), 
null);
+        context.getExecutionContext(createdProducer, null);
     DirectExecutionContext.StepContext stepContext = 
fooContext.createStepContext(
-        "STEP", 
created.getProducingTransformInternal().getTransform().getName());
+        "STEP", createdProducer.getTransform().getName());
     AggregatorContainer container = context.getAggregatorContainer();
     AggregatorContainer.Mutator mutator = container.createMutator();
     mutator.createAggregatorForDoFn(fn, stepContext, "foo", new 
SumLongFn()).addValue(4L);
 
     TransformResult<?> result =
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal())
+        StepTransformResult.withoutHold(createdProducer)
             .withAggregatorChanges(mutator)
             .build();
     context.handleResult(null, ImmutableList.<TimerData>of(), result);
@@ -250,7 +256,7 @@ public class EvaluationContextTest {
     mutatorAgain.createAggregatorForDoFn(fn, stepContext, "foo", new 
SumLongFn()).addValue(12L);
 
     TransformResult<?> secondResult =
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+        StepTransformResult.withoutHold(downstreamProducer)
             .withAggregatorChanges(mutatorAgain)
             .build();
     context.handleResult(
@@ -264,7 +270,7 @@ public class EvaluationContextTest {
   public void handleResultStoresState() {
     StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), 
ByteArrayCoder.of());
     DirectExecutionContext fooContext =
-        
context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+        context.getExecutionContext(downstreamProducer, myKey);
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", 
VarIntCoder.of());
 
@@ -276,7 +282,7 @@ public class EvaluationContextTest {
     bag.add(4);
 
     TransformResult<?> stateResult =
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+        StepTransformResult.withoutHold(downstreamProducer)
             .withState(state)
             .build();
 
@@ -286,7 +292,7 @@ public class EvaluationContextTest {
         stateResult);
 
     DirectExecutionContext afterResultContext =
-        
context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
+        context.getExecutionContext(downstreamProducer, myKey);
 
     CopyOnAccessInMemoryStateInternals<Object> afterResultState =
         afterResultContext.getOrCreateStepContext("s1", "s1").stateInternals();
@@ -309,7 +315,7 @@ public class EvaluationContextTest {
         downstream, GlobalWindow.INSTANCE, WindowingStrategy.globalDefault(), 
callback);
 
     TransformResult<?> result =
-        StepTransformResult.withHold(created.getProducingTransformInternal(), 
new Instant(0))
+        StepTransformResult.withHold(createdProducer, new Instant(0))
             .build();
 
     context.handleResult(null, ImmutableList.<TimerData>of(), result);
@@ -318,7 +324,7 @@ public class EvaluationContextTest {
     assertThat(callLatch.await(500L, TimeUnit.MILLISECONDS), is(false));
 
     TransformResult<?> finishedResult =
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
     context.forceRefresh();
     // Obtain the value via blocking call
@@ -328,7 +334,7 @@ public class EvaluationContextTest {
   @Test
   public void 
callAfterOutputMustHaveBeenProducedAlreadyAfterCallsImmediately() throws 
Exception {
     TransformResult<?> finishedResult =
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
     context.handleResult(null, ImmutableList.<TimerData>of(), finishedResult);
 
     final CountDownLatch callLatch = new CountDownLatch(1);
@@ -348,7 +354,7 @@ public class EvaluationContextTest {
   @Test
   public void extractFiredTimersExtractsTimers() {
     TransformResult<?> holdResult =
-        StepTransformResult.withHold(created.getProducingTransformInternal(), 
new Instant(0))
+        StepTransformResult.withHold(createdProducer, new Instant(0))
             .build();
     context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
 
@@ -356,7 +362,7 @@ public class EvaluationContextTest {
     TimerData toFire =
         TimerData.of(StateNamespaces.global(), new Instant(100L), 
TimeDomain.EVENT_TIME);
     TransformResult<?> timerResult =
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal())
+        StepTransformResult.withoutHold(downstreamProducer)
             .withState(CopyOnAccessInMemoryStateInternals.withUnderlying(key, 
null))
             .withTimerUpdate(TimerUpdate.builder(key).setTimer(toFire).build())
             .build();
@@ -372,7 +378,7 @@ public class EvaluationContextTest {
     assertThat(context.extractFiredTimers(), emptyIterable());
 
     TransformResult<?> advanceResult =
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
     // Should cause the downstream timer to fire
     context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
 
@@ -403,14 +409,14 @@ public class EvaluationContextTest {
   @Test
   public void isDoneWithUnboundedPCollectionAndShutdown() {
     
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), 
is(false));
+    assertThat(context.isDone(unboundedProducer), is(false));
 
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(unboundedProducer).build());
     context.extractFiredTimers();
-    assertThat(context.isDone(unbounded.getProducingTransformInternal()), 
is(true));
+    assertThat(context.isDone(unboundedProducer), is(true));
   }
 
   @Test
@@ -428,14 +434,14 @@ public class EvaluationContextTest {
   @Test
   public void isDoneWithOnlyBoundedPCollections() {
     
context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
-    assertThat(context.isDone(created.getProducingTransformInternal()), 
is(false));
+    assertThat(context.isDone(createdProducer), is(false));
 
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(createdProducer).build());
     context.extractFiredTimers();
-    assertThat(context.isDone(created.getProducingTransformInternal()), 
is(true));
+    assertThat(context.isDone(createdProducer), is(true));
   }
 
   @Test
@@ -449,7 +455,7 @@ public class EvaluationContextTest {
         context.handleResult(
             null,
             ImmutableList.<TimerData>of(),
-            
StepTransformResult.<Integer>withoutHold(created.getProducingTransformInternal())
+            StepTransformResult.<Integer>withoutHold(createdProducer)
                 .addOutput(rootBundle)
                 .build());
     @SuppressWarnings("unchecked")
@@ -458,7 +464,7 @@ public class EvaluationContextTest {
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(unboundedProducer).build());
     assertThat(context.isDone(), is(false));
 
     for (AppliedPTransform<?, ?, ?> consumers : 
graph.getPrimitiveConsumers(created)) {
@@ -479,22 +485,22 @@ public class EvaluationContextTest {
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(createdProducer).build());
     context.handleResult(
         null,
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(unboundedProducer).build());
     context.handleResult(
         context.createBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(downstreamProducer).build());
     context.extractFiredTimers();
     assertThat(context.isDone(), is(false));
 
     context.handleResult(
         context.createBundle(created).commit(Instant.now()),
         ImmutableList.<TimerData>of(),
-        
StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+        StepTransformResult.withoutHold(viewProducer).build());
     context.extractFiredTimers();
     assertThat(context.isDone(), is(false));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
index cb27fbc..9e22c36 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/FlattenEvaluatorFactoryTest.java
@@ -47,6 +47,7 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class FlattenEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
+
   @Test
   public void testFlattenInMemoryEvaluator() throws Exception {
     TestPipeline p = TestPipeline.create();
@@ -69,10 +70,11 @@ public class FlattenEvaluatorFactoryTest {
     when(context.createBundle(flattened)).thenReturn(flattenedLeftBundle, 
flattenedRightBundle);
 
     FlattenEvaluatorFactory factory = new FlattenEvaluatorFactory(context);
+    AppliedPTransform<?, ?, ?> flattenedProducer = 
DirectGraphs.getProducer(flattened);
     TransformEvaluator<Integer> leftSideEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), 
leftBundle);
+        factory.forApplication(flattenedProducer, leftBundle);
     TransformEvaluator<Integer> rightSideEvaluator =
-        factory.forApplication(flattened.getProducingTransformInternal(), 
rightBundle);
+        factory.forApplication(flattenedProducer, rightBundle);
 
     leftSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(1));
     rightSideEvaluator.processElement(WindowedValue.valueInGlobalWindow(-1));
@@ -92,13 +94,13 @@ public class FlattenEvaluatorFactoryTest {
         Matchers.<UncommittedBundle<?>>contains(flattenedRightBundle));
     assertThat(
         rightSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(flattened.getProducingTransformInternal()));
+        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer));
     assertThat(
         leftSideResult.getOutputBundles(),
         Matchers.<UncommittedBundle<?>>contains(flattenedLeftBundle));
     assertThat(
         leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(flattened.getProducingTransformInternal()));
+        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattenedProducer));
 
     assertThat(
         flattenedLeftBundle.commit(Instant.now()).getElements(),
@@ -126,9 +128,10 @@ public class FlattenEvaluatorFactoryTest {
         .thenReturn(bundleFactory.createBundle(flattened));
 
     FlattenEvaluatorFactory factory = new 
FlattenEvaluatorFactory(evaluationContext);
+    AppliedPTransform<?, ?, ?> flattendProducer = 
DirectGraphs.getProducer(flattened);
     TransformEvaluator<Integer> emptyEvaluator =
         factory.forApplication(
-            flattened.getProducingTransformInternal(),
+            flattendProducer,
             
bundleFactory.createRootBundle().commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
 
     TransformResult<Integer> leftSideResult = emptyEvaluator.finishBundle();
@@ -138,7 +141,7 @@ public class FlattenEvaluatorFactoryTest {
     assertThat(outputBundle.getElements(), emptyIterable());
     assertThat(
         leftSideResult.getTransform(),
-        Matchers.<AppliedPTransform<?, ?, 
?>>equalTo(flattened.getProducingTransformInternal()));
+        Matchers.<AppliedPTransform<?, ?, ?>>equalTo(flattendProducer));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 7ba38ce..f0b29f0 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -97,7 +97,7 @@ public class GroupByKeyEvaluatorFactoryTest {
         ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
     TransformEvaluator<KV<String, Integer>> evaluator =
         new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
-            .forApplication(groupedKvs.getProducingTransformInternal(), 
inputBundle);
+            .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
     evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
index 23340c6..7efdb3d 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactoryTest.java
@@ -90,8 +90,7 @@ public class GroupByKeyOnlyEvaluatorFactoryTest {
         ((KvCoder<String, Integer>) values.getCoder()).getKeyCoder();
     TransformEvaluator<KV<String, Integer>> evaluator =
         new GroupByKeyOnlyEvaluatorFactory(evaluationContext)
-            .forApplication(
-                groupedKvs.getProducingTransformInternal(), inputBundle);
+            .forApplication(DirectGraphs.getProducer(groupedKvs), inputBundle);
 
     evaluator.processElement(WindowedValue.valueInGlobalWindow(firstFoo));
     evaluator.processElement(WindowedValue.valueInGlobalWindow(secondFoo));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
index a65cd30..1ad6ba6 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactoryTest.java
@@ -64,7 +64,7 @@ public class ImmutabilityEnforcementFactoryTest implements 
Serializable {
                         c.element()[0] = 'b';
                       }
                     }));
-    consumer = 
pcollection.apply(Count.<byte[]>globally()).getProducingTransformInternal();
+    consumer = 
DirectGraphs.getProducer(pcollection.apply(Count.<byte[]>globally()));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 85e99c5..d48ac14 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -152,8 +152,9 @@ public class ParDoEvaluatorTest {
     when(evaluationContext.getAggregatorContainer()).thenReturn(container);
     when(evaluationContext.getAggregatorMutator()).thenReturn(mutator);
 
+    @SuppressWarnings("unchecked")
     AppliedPTransform<PCollection<Integer>, ?, ?> transform =
-        (AppliedPTransform<PCollection<Integer>, ?, ?>) 
output.getProducingTransformInternal();
+        (AppliedPTransform<PCollection<Integer>, ?, ?>) 
DirectGraphs.getProducer(output);
     return ParDoEvaluator.create(
         evaluationContext,
         stepContext,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index ecf11ed..06c85ef 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -129,7 +129,7 @@ public class StatefulParDoEvaluatorFactoryTest implements 
Serializable {
     AppliedPTransform<
             PCollection<? extends KV<String, Iterable<Integer>>>, 
PCollectionTuple,
             StatefulParDo<String, Integer, Integer>>
-        producingTransform = (AppliedPTransform) 
produced.getProducingTransformInternal();
+        producingTransform = (AppliedPTransform) 
DirectGraphs.getProducer(produced);
 
     // Then there will be a digging down to the step context to get the state 
internals
     when(mockEvaluationContext.getExecutionContext(
@@ -239,7 +239,7 @@ public class StatefulParDoEvaluatorFactoryTest implements 
Serializable {
     AppliedPTransform<
             PCollection<KV<String, Iterable<Integer>>>, PCollectionTuple,
             StatefulParDo<String, Integer, Integer>>
-        producingTransform = (AppliedPTransform) 
produced.getProducingTransformInternal();
+        producingTransform = (AppliedPTransform) 
DirectGraphs.getProducer(produced);
 
     // Then there will be a digging down to the step context to get the state 
internals
     when(mockEvaluationContext.getExecutionContext(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
index a21d8f7..d3a2cca 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StepTransformResultTest.java
@@ -48,7 +48,7 @@ public class StepTransformResultTest {
   public void setup() {
     TestPipeline p = TestPipeline.create();
     pc = p.apply(Create.of(1, 2, 3));
-    transform = pc.getProducingTransformInternal();
+    transform = DirectGraphs.getGraph(p).getProducer(pc);
 
     bundleFactory = ImmutableListBundleFactory.create();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
index 3d31df6..6bb8623 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactoryTest.java
@@ -32,6 +32,7 @@ import 
org.apache.beam.runners.direct.TestStreamEvaluatorFactory.TestStreamIndex
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
@@ -80,15 +81,16 @@ public class TestStreamEvaluatorFactoryTest {
     when(context.createBundle(streamVals))
         .thenReturn(bundleFactory.createBundle(streamVals), 
bundleFactory.createBundle(streamVals));
 
+    AppliedPTransform<?, ?, ?> streamProducer = 
DirectGraphs.getProducer(streamVals);
     Collection<CommittedBundle<?>> initialInputs =
         new TestStreamEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(streamVals.getProducingTransformInternal(), 1);
+            .getInitialInputs(streamProducer, 1);
     @SuppressWarnings("unchecked")
     CommittedBundle<TestStreamIndex<Integer>> initialBundle =
         (CommittedBundle<TestStreamIndex<Integer>>) 
Iterables.getOnlyElement(initialInputs);
 
     TransformEvaluator<TestStreamIndex<Integer>> firstEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
initialBundle);
+        factory.forApplication(streamProducer, initialBundle);
     
firstEvaluator.processElement(Iterables.getOnlyElement(initialBundle.getElements()));
     TransformResult<TestStreamIndex<Integer>> firstResult = 
firstEvaluator.finishBundle();
 
@@ -101,7 +103,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> secondBundle =
         initialBundle.withElements(Collections.singleton(firstResidual));
     TransformEvaluator<TestStreamIndex<Integer>> secondEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
secondBundle);
+        factory.forApplication(streamProducer, secondBundle);
     secondEvaluator.processElement(firstResidual);
     TransformResult<TestStreamIndex<Integer>> secondResult = 
secondEvaluator.finishBundle();
 
@@ -114,7 +116,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> thirdBundle =
         secondBundle.withElements(Collections.singleton(secondResidual));
     TransformEvaluator<TestStreamIndex<Integer>> thirdEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
thirdBundle);
+        factory.forApplication(streamProducer, thirdBundle);
     thirdEvaluator.processElement(secondResidual);
     TransformResult<TestStreamIndex<Integer>> thirdResult = 
thirdEvaluator.finishBundle();
 
@@ -128,7 +130,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> fourthBundle =
         thirdBundle.withElements(Collections.singleton(thirdResidual));
     TransformEvaluator<TestStreamIndex<Integer>> fourthEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
fourthBundle);
+        factory.forApplication(streamProducer, fourthBundle);
     fourthEvaluator.processElement(thirdResidual);
     TransformResult<TestStreamIndex<Integer>> fourthResult = 
fourthEvaluator.finishBundle();
 
@@ -142,7 +144,7 @@ public class TestStreamEvaluatorFactoryTest {
     CommittedBundle<TestStreamIndex<Integer>> fifthBundle =
         thirdBundle.withElements(Collections.singleton(fourthResidual));
     TransformEvaluator<TestStreamIndex<Integer>> fifthEvaluator =
-        factory.forApplication(streamVals.getProducingTransformInternal(), 
fifthBundle);
+        factory.forApplication(streamProducer, fifthBundle);
     fifthEvaluator.processElement(fourthResidual);
     TransformResult<TestStreamIndex<Integer>> fifthResult = 
fifthEvaluator.finishBundle();
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 08b1e18..4ad22bc 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -89,8 +89,9 @@ public class TransformExecutorTest {
     created = p.apply(Create.of("foo", "spam", "third"));
     PCollection<KV<Integer, String>> downstream = 
created.apply(WithKeys.<Integer, String>of(3));
 
-    createdProducer = created.getProducingTransformInternal();
-    downstreamProducer = downstream.getProducingTransformInternal();
+    DirectGraph graph = DirectGraphs.getGraph(p);
+    createdProducer = graph.getProducer(created);
+    downstreamProducer = graph.getProducer(downstream);
 
     when(evaluationContext.getMetrics()).thenReturn(metrics);
   }
@@ -317,7 +318,7 @@ public class TransformExecutorTest {
   @Test
   public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
     final TransformResult<Object> result =
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
 
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {
@@ -356,7 +357,7 @@ public class TransformExecutorTest {
   @Test
   public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
     final TransformResult<Object> result =
-        
StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+        StepTransformResult.withoutHold(createdProducer).build();
 
     TransformEvaluator<Object> evaluator =
         new TransformEvaluator<Object>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 5a10134..dd36a2f 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static org.apache.beam.runners.direct.DirectGraphs.getProducer;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -90,6 +91,7 @@ public class UnboundedReadEvaluatorFactoryTest {
   private BundleFactory bundleFactory = ImmutableListBundleFactory.create();
 
   private UnboundedSource<Long, ?> source;
+  private DirectGraph graph;
 
   @Before
   public void setup() {
@@ -100,6 +102,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     context = mock(EvaluationContext.class);
     factory = new UnboundedReadEvaluatorFactory(context);
     output = bundleFactory.createBundle(longs);
+    graph = DirectGraphs.getGraph(p);
     when(context.createBundle(longs)).thenReturn(output);
   }
 
@@ -115,7 +118,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     int numSplits = 5;
     Collection<CommittedBundle<?>> initialInputs =
         new UnboundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), 
numSplits);
+            .getInitialInputs(graph.getProducer(longs), numSplits);
     // CountingSource.unbounded has very good splitting behavior
     assertThat(initialInputs, hasSize(numSplits));
 
@@ -148,15 +151,14 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     Collection<CommittedBundle<?>> initialInputs =
         new UnboundedReadEvaluatorFactory.InputProvider(context)
-            .getInitialInputs(longs.getProducingTransformInternal(), 1);
+            .getInitialInputs(graph.getProducer(longs), 1);
 
     CommittedBundle<?> inputShards = Iterables.getOnlyElement(initialInputs);
     UnboundedSourceShard<Long, ?> inputShard =
         (UnboundedSourceShard<Long, ?>)
             Iterables.getOnlyElement(inputShards.getElements()).getValue();
     TransformEvaluator<? super UnboundedSourceShard<Long, ?>> evaluator =
-        factory.forApplication(
-            longs.getProducingTransformInternal(), inputShards);
+        factory.forApplication(graph.getProducer(longs), inputShards);
 
     evaluator.processElement((WindowedValue) 
Iterables.getOnlyElement(inputShards.getElements()));
     TransformResult<? super UnboundedSourceShard<Long, ?>> result = 
evaluator.finishBundle();
@@ -190,7 +192,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = getProducer(pcollection);
 
     
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     Collection<CommittedBundle<?>> initialInputs =
@@ -233,7 +235,7 @@ public class UnboundedReadEvaluatorFactoryTest {
     // Read with a very slow rate so by the second read there are no more 
elements
     PCollection<Long> pcollection =
         p.apply(Read.from(new TestUnboundedSource<>(VarLongCoder.of(), 1L)));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform = 
DirectGraphs.getProducer(pcollection);
 
     
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     Collection<CommittedBundle<?>> initialInputs =
@@ -291,7 +293,9 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+    DirectGraph graph = DirectGraphs.getGraph(p);
+    AppliedPTransform<?, ?, ?> sourceTransform =
+        graph.getProducer(pcollection);
 
     
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);
@@ -307,8 +311,7 @@ public class UnboundedReadEvaluatorFactoryTest {
             .commit(Instant.now());
     UnboundedReadEvaluatorFactory factory =
         new UnboundedReadEvaluatorFactory(context, 1.0 /* Always reuse */);
-    new UnboundedReadEvaluatorFactory.InputProvider(context)
-        .getInitialInputs(pcollection.getProducingTransformInternal(), 1);
+    new 
UnboundedReadEvaluatorFactory.InputProvider(context).getInitialInputs(sourceTransform,
 1);
     TransformEvaluator<UnboundedSourceShard<Long, TestCheckpointMark>> 
evaluator =
         factory.forApplication(sourceTransform, inputBundle);
     evaluator.processElement(shard);
@@ -336,7 +339,8 @@ public class UnboundedReadEvaluatorFactoryTest {
 
     TestPipeline p = TestPipeline.create();
     PCollection<Long> pcollection = p.apply(Read.from(source));
-    AppliedPTransform<?, ?, ?> sourceTransform = 
pcollection.getProducingTransformInternal();
+    AppliedPTransform<?, ?, ?> sourceTransform =
+        DirectGraphs.getGraph(p).getProducer(pcollection);
 
     
when(context.createRootBundle()).thenReturn(bundleFactory.createRootBundle());
     UncommittedBundle<Long> output = bundleFactory.createBundle(pcollection);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
index 7d14020..7c08009 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Values;
@@ -73,9 +74,10 @@ public class ViewEvaluatorFactoryTest {
 
     CommittedBundle<String> inputBundle =
         bundleFactory.createBundle(input).commit(Instant.now());
+    AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view);
     TransformEvaluator<Iterable<String>> evaluator =
         new ViewEvaluatorFactory(context)
-            .forApplication(view.getProducingTransformInternal(), inputBundle);
+            .forApplication(producer, inputBundle);
 
     evaluator.processElement(
         
WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", 
"bar")));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6c6ad37/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
index 1be9a98..acdabb6 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -55,8 +55,10 @@ public class WatermarkCallbackExecutorTest {
   public void setup() {
     TestPipeline p = TestPipeline.create();
     PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
-    create = created.getProducingTransformInternal();
-    sum = 
created.apply(Sum.integersGlobally()).getProducingTransformInternal();
+    PCollection<Integer> summed = created.apply(Sum.integersGlobally());
+    DirectGraph graph = DirectGraphs.getGraph(p);
+    create = graph.getProducer(created);
+    sum = graph.getProducer(summed);
   }
 
   @Test

Reply via email to