Migrate TransformTreeNode to an Inner Class

TransformTreeNode requires access to the hierarchy it is contained
within, and generally cannot be separated from TransformHierarchy. It is
primarily an implementation detail of TransformHierarchy, so can be
relocated to within it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/569e8d70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/569e8d70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/569e8d70

Branch: refs/heads/master
Commit: 569e8d7085cf4e6effd379f23716202c6c5daf52
Parents: 24fab9f
Author: Thomas Groh <tg...@google.com>
Authored: Thu Dec 1 13:19:14 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Dec 1 14:34:21 2016 -0800

----------------------------------------------------------------------
 .../translation/ApexPipelineTranslator.java     |  12 +-
 .../apex/translation/TranslationContext.java    |   6 +-
 .../direct/ConsumerTrackingPipelineVisitor.java |  12 +-
 .../runners/direct/DisplayDataValidator.java    |   6 +-
 .../direct/KeyedPValueTrackingVisitor.java      |  10 +-
 .../apache/beam/runners/flink/FlinkRunner.java  |  12 +-
 .../FlinkBatchPipelineTranslator.java           |  14 +-
 .../FlinkStreamingPipelineTranslator.java       |  16 +-
 .../PipelineTranslationOptimizer.java           |  10 +-
 .../dataflow/DataflowPipelineTranslator.java    |   8 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  10 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   4 +-
 .../dataflow/RecordingPipelineVisitor.java      |   6 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  21 +-
 .../beam/sdk/AggregatorPipelineExtractor.java   |   6 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |  17 +-
 .../beam/sdk/runners/TransformHierarchy.java    | 243 +++++++++++++++-
 .../beam/sdk/runners/TransformTreeNode.java     | 282 -------------------
 .../sdk/AggregatorPipelineExtractorTest.java    |  20 +-
 .../sdk/runners/TransformHierarchyTest.java     |  26 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   8 +-
 .../display/DisplayDataEvaluator.java           |   8 +-
 22 files changed, 343 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
index d38faf7..8d6db84 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java
@@ -19,17 +19,15 @@
 package org.apache.beam.runners.apex.translation;
 
 import com.datatorrent.api.DAG;
-
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import 
org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInputOperator;
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -84,18 +82,18 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
   }
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
     LOG.debug("entering composite transform {}", node.getTransform());
     return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
     LOG.debug("leaving composite transform {}", node.getTransform());
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     LOG.debug("visiting transform {}", node.getTransform());
     PTransform transform = node.getTransform();
     TransformTranslator translator = 
getTransformTranslator(transform.getClass());
@@ -108,7 +106,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
   }
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
     LOG.debug("visiting value {}", value);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index e016730..259afbd 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -24,18 +24,16 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -73,7 +71,7 @@ class TranslationContext {
     this.pipelineOptions = pipelineOptions;
   }
 
-  public void setCurrentTransform(TransformTreeNode treeNode) {
+  public void setCurrentTransform(TransformHierarchy.Node treeNode) {
     this.currentTransform = AppliedPTransform.of(treeNode.getFullName(),
         treeNode.getInput(), treeNode.getOutput(), (PTransform) 
treeNode.getTransform());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
index 4fdfea0..acfad16 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ConsumerTrackingPipelineVisitor.java
@@ -28,7 +28,7 @@ import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -50,7 +50,7 @@ public class ConsumerTrackingPipelineVisitor extends 
PipelineVisitor.Defaults {
   private boolean finalized = false;
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
     checkState(
         !finalized,
         "Attempting to traverse a pipeline (node %s) with a %s "
@@ -61,7 +61,7 @@ public class ConsumerTrackingPipelineVisitor extends 
PipelineVisitor.Defaults {
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
     checkState(
         !finalized,
         "Attempting to traverse a pipeline (node %s) with a %s which is 
already finalized",
@@ -73,7 +73,7 @@ public class ConsumerTrackingPipelineVisitor extends 
PipelineVisitor.Defaults {
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     toFinalize.removeAll(node.getInput().expand());
     AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(node);
     stepNames.put(appliedTransform, genStepName());
@@ -86,7 +86,7 @@ public class ConsumerTrackingPipelineVisitor extends 
PipelineVisitor.Defaults {
     }
   }
 
-  private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformTreeNode 
node) {
+  private AppliedPTransform<?, ?, ?> 
getAppliedTransform(TransformHierarchy.Node node) {
     @SuppressWarnings({"rawtypes", "unchecked"})
     AppliedPTransform<?, ?, ?> application = AppliedPTransform.of(
         node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
node.getTransform());
@@ -94,7 +94,7 @@ public class ConsumerTrackingPipelineVisitor extends 
PipelineVisitor.Defaults {
   }
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
     toFinalize.add(value);
     for (PValue expandedValue : value.expand()) {
       valueToConsumers.put(expandedValue, new ArrayList<AppliedPTransform<?, 
?, ?>>());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
index e09fe62..c77cb48 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DisplayDataValidator.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.direct;
 
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 
@@ -51,7 +51,7 @@ class DisplayDataValidator {
     private static final Visitor INSTANCE = new Visitor();
 
     @Override
-    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
       if (!node.isRootNode()) {
         evaluateDisplayData(node.getTransform());
       }
@@ -60,7 +60,7 @@ class DisplayDataValidator {
     }
 
     @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
       evaluateDisplayData(node.getTransform());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
index 47b0857..5dc24c2 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkState;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
@@ -55,7 +55,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
   }
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
     checkState(
         !finalized,
         "Attempted to use a %s that has already been finalized on a pipeline 
(visiting node %s)",
@@ -65,7 +65,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor {
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
     checkState(
         !finalized,
         "Attempted to use a %s that has already been finalized on a pipeline 
(visiting node %s)",
@@ -79,10 +79,10 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor 
{
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {}
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {}
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
     if (producesKeyedOutputs.contains(producer.getTransform().getClass())) {
       keyedValues.addAll(value.expand());
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
index 488c170..0b92734 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java
@@ -31,7 +31,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
-
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.Coder;
@@ -41,7 +40,7 @@ import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -55,7 +54,6 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.DetachedEnvironment;
 import org.slf4j.Logger;
@@ -259,18 +257,18 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
       final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders 
= new TreeSet<>();
       pipeline.traverseTopologically(new Pipeline.PipelineVisitor() {
         @Override
-        public void visitValue(PValue value, TransformTreeNode producer) {
+        public void visitValue(PValue value, TransformHierarchy.Node producer) 
{
         }
 
         @Override
-        public void visitPrimitiveTransform(TransformTreeNode node) {
+        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
           if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
             
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
           }
         }
 
         @Override
-        public CompositeBehavior enterCompositeTransform(TransformTreeNode 
node) {
+        public CompositeBehavior 
enterCompositeTransform(TransformHierarchy.Node node) {
           if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
             
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
           }
@@ -278,7 +276,7 @@ public class FlinkRunner extends 
PipelineRunner<PipelineResult> {
         }
 
         @Override
-        public void leaveCompositeTransform(TransformTreeNode node) {
+        public void leaveCompositeTransform(TransformHierarchy.Node node) {
         }
       });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
index f36be6b..805c41c 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation;
 
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.flink.api.java.DataSet;
@@ -63,7 +63,7 @@ public class FlinkBatchPipelineTranslator extends 
FlinkPipelineTranslator {
   // 
--------------------------------------------------------------------------------------------
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
     LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
     this.depth++;
 
@@ -79,13 +79,13 @@ public class FlinkBatchPipelineTranslator extends 
FlinkPipelineTranslator {
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
     this.depth--;
     LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + 
formatNodeName(node));
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + 
formatNodeName(node));
 
     // get the transformation corresponding to the node we are
@@ -103,7 +103,7 @@ public class FlinkBatchPipelineTranslator extends 
FlinkPipelineTranslator {
 
   private <T extends PTransform<?, ?>> void applyBatchTransform(
       PTransform<?, ?> transform,
-      TransformTreeNode node,
+      TransformHierarchy.Node node,
       BatchTransformTranslator<?> translator) {
 
     @SuppressWarnings("unchecked")
@@ -128,7 +128,7 @@ public class FlinkBatchPipelineTranslator extends 
FlinkPipelineTranslator {
   /**
    * Returns a translator for the given node, if it is possible, otherwise 
null.
    */
-  private static BatchTransformTranslator<?> getTranslator(TransformTreeNode 
node) {
+  private static BatchTransformTranslator<?> 
getTranslator(TransformHierarchy.Node node) {
     PTransform<?, ?> transform = node.getTransform();
 
     // Root of the graph is null
@@ -139,7 +139,7 @@ public class FlinkBatchPipelineTranslator extends 
FlinkPipelineTranslator {
     return FlinkBatchTransformTranslators.getTranslator(transform);
   }
 
-  private static String formatNodeName(TransformTreeNode node) {
+  private static String formatNodeName(TransformHierarchy.Node node) {
     return node.toString().split("@")[1] + node.getTransform();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
index e5c0d76..a07dc3d 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java
@@ -18,7 +18,7 @@
 package org.apache.beam.runners.flink.translation;
 
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
@@ -50,7 +50,7 @@ public class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
   // 
--------------------------------------------------------------------------------------------
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
     LOG.info(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
     this.depth++;
 
@@ -69,13 +69,13 @@ public class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {
     this.depth--;
     LOG.info(genSpaces(this.depth) + "leaveCompositeTransform- " + 
formatNodeName(node));
   }
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     LOG.info(genSpaces(this.depth) + "visitPrimitiveTransform- " + 
formatNodeName(node));
     // get the transformation corresponding to hte node we are
     // currently visiting and translate it into its Flink alternative.
@@ -93,13 +93,13 @@ public class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
   }
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
     // do nothing here
   }
 
   private <T extends PTransform<?, ?>> void applyStreamingTransform(
       PTransform<?, ?> transform,
-      TransformTreeNode node,
+      TransformHierarchy.Node node,
       StreamTransformTranslator<?> translator) {
 
     @SuppressWarnings("unchecked")
@@ -116,7 +116,7 @@ public class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
 
   private <T extends PTransform<?, ?>> boolean applyCanTranslate(
       PTransform<?, ?> transform,
-      TransformTreeNode node,
+      TransformHierarchy.Node node,
       StreamTransformTranslator<?> translator) {
 
     @SuppressWarnings("unchecked")
@@ -151,7 +151,7 @@ public class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
     }
   }
 
-  private static String formatNodeName(TransformTreeNode node) {
+  private static String formatNodeName(TransformHierarchy.Node node) {
     return node.toString().split("@")[1] + node.getTransform();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
index 97d123c..99f7ceb 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/PipelineTranslationOptimizer.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.flink.translation;
 
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.sdk.io.Read;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 import org.slf4j.Logger;
@@ -52,15 +52,15 @@ public class PipelineTranslationOptimizer extends 
FlinkPipelineTranslator {
   }
 
   @Override
-  public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
     return CompositeBehavior.ENTER_TRANSFORM;
   }
 
   @Override
-  public void leaveCompositeTransform(TransformTreeNode node) {}
+  public void leaveCompositeTransform(TransformHierarchy.Node node) {}
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     Class<? extends PTransform> transformClass = 
node.getTransform().getClass();
     if (transformClass == Read.Unbounded.class) {
       LOG.info("Found {}. Switching to streaming execution.", transformClass);
@@ -69,5 +69,5 @@ public class PipelineTranslationOptimizer extends 
FlinkPipelineTranslator {
   }
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {}
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 2af2cae..1cff42a 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -66,7 +66,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -517,11 +517,11 @@ public class DataflowPipelineTranslator {
 
 
     @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
+    public void leaveCompositeTransform(TransformHierarchy.Node node) {
     }
 
     @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
       PTransform<?, ?> transform = node.getTransform();
       TransformTranslator translator =
           getTransformTranslator(transform.getClass());
@@ -537,7 +537,7 @@ public class DataflowPipelineTranslator {
     }
 
     @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {
+    public void visitValue(PValue value, TransformHierarchy.Node producer) {
       LOG.debug("Checking translation of {}", value);
       if (value.getProducingTransformInternal() == null) {
         throw new RuntimeException(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0099856..6ed386a 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -119,7 +119,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -684,18 +684,18 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       final SortedSet<String> ptransformViewNamesWithNonDeterministicKeyCoders 
= new TreeSet<>();
       pipeline.traverseTopologically(new PipelineVisitor() {
         @Override
-        public void visitValue(PValue value, TransformTreeNode producer) {
+        public void visitValue(PValue value, TransformHierarchy.Node producer) 
{
         }
 
         @Override
-        public void visitPrimitiveTransform(TransformTreeNode node) {
+        public void visitPrimitiveTransform(TransformHierarchy.Node node) {
           if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
             
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
           }
         }
 
         @Override
-        public CompositeBehavior enterCompositeTransform(TransformTreeNode 
node) {
+        public CompositeBehavior 
enterCompositeTransform(TransformHierarchy.Node node) {
           if 
(ptransformViewsWithNonDeterministicKeyCoders.contains(node.getTransform())) {
             
ptransformViewNamesWithNonDeterministicKeyCoders.add(node.getFullName());
           }
@@ -703,7 +703,7 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
         }
 
         @Override
-        public void leaveCompositeTransform(TransformTreeNode node) {
+        public void leaveCompositeTransform(TransformHierarchy.Node node) {
         }
       });
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 3925ed4..5375c95 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -81,7 +81,7 @@ import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -912,7 +912,7 @@ public class DataflowRunnerTest {
     private List<PTransform<?, ?>> transforms = new ArrayList<>();
 
     @Override
-    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
       if (node.getTransform() != null) {
         transforms.add(node.getTransform());
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/RecordingPipelineVisitor.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/RecordingPipelineVisitor.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/RecordingPipelineVisitor.java
index 2090877..1d5a7f5 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/RecordingPipelineVisitor.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/RecordingPipelineVisitor.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.dataflow;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PValue;
 
@@ -34,12 +34,12 @@ class RecordingPipelineVisitor extends 
Pipeline.PipelineVisitor.Defaults {
   public final List<PValue> values = new ArrayList<>();
 
   @Override
-  public void visitPrimitiveTransform(TransformTreeNode node) {
+  public void visitPrimitiveTransform(TransformHierarchy.Node node) {
     transforms.add(node.getTransform());
   }
 
   @Override
-  public void visitValue(PValue value, TransformTreeNode producer) {
+  public void visitValue(PValue value, TransformHierarchy.Node producer) {
     values.add(value);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 49e0113..63f77c0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -32,7 +32,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -213,7 +213,7 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
     }
 
     @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
       if (translationMode.equals(TranslationMode.BATCH)) {
         Class<? extends PTransform> transformClass = 
node.getTransform().getClass();
         if (transformClass == Read.Unbounded.class) {
@@ -239,7 +239,7 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
     }
 
     @Override
-    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
       if (node.getTransform() != null) {
         @SuppressWarnings("unchecked")
         Class<PTransform<?, ?>> transformClass =
@@ -254,7 +254,7 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
       return CompositeBehavior.ENTER_TRANSFORM;
     }
 
-    private boolean shouldDefer(TransformTreeNode node) {
+    private boolean shouldDefer(TransformHierarchy.Node node) {
       PInput input = node.getInput();
       // if the input is not a PCollection, or it is but with non merging 
windows, don't defer.
       if (!(input instanceof PCollection)
@@ -283,12 +283,12 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
     }
 
     @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
       doVisitTransform(node);
     }
 
     <TransformT extends PTransform<? super PInput, POutput>> void
-        doVisitTransform(TransformTreeNode node) {
+        doVisitTransform(TransformHierarchy.Node node) {
       @SuppressWarnings("unchecked")
       TransformT transform = (TransformT) node.getTransform();
       @SuppressWarnings("unchecked")
@@ -304,11 +304,12 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
     }
 
     /**
-     *  Determine if this Node belongs to a Bounded branch of the pipeline, or 
Unbounded, and
-     *  translate with the proper translator.
+     * Determine if this Node belongs to a Bounded branch of the pipeline, or 
Unbounded, and
+     * translate with the proper translator.
      */
-    private <TransformT extends PTransform<? super PInput, POutput>> 
TransformEvaluator<TransformT>
-        translate(TransformTreeNode node, TransformT transform, 
Class<TransformT> transformClass) {
+    private <TransformT extends PTransform<? super PInput, POutput>>
+        TransformEvaluator<TransformT> translate(
+            TransformHierarchy.Node node, TransformT transform, 
Class<TransformT> transformClass) {
       //--- determine if node is bounded/unbounded.
       // usually, the input determines if the PCollection to apply the next 
transformation to
       // is BOUNDED or UNBOUNDED, meaning RDD/DStream.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
index 0e79abe..d2130d0 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AggregatorRetriever;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -62,7 +62,7 @@ class AggregatorPipelineExtractor {
     }
 
     @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
       PTransform<?, ?> transform = node.getTransform();
       addStepToAggregators(transform, getAggregators(transform));
     }
@@ -86,6 +86,6 @@ class AggregatorPipelineExtractor {
     }
 
     @Override
-    public void visitValue(PValue value, TransformTreeNode producer) {}
+    public void visitValue(PValue value, TransformHierarchy.Node producer) {}
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index c8a4439..7a16f9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -30,7 +30,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.runners.TransformTreeNode;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -218,25 +217,25 @@ public class Pipeline {
      *
      * <p>The return value controls whether or not child transforms are 
visited.
      */
-    CompositeBehavior enterCompositeTransform(TransformTreeNode node);
+    CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node);
 
     /**
      * Called for each composite transform after all of its component 
transforms and their outputs
      * have been visited.
      */
-    void leaveCompositeTransform(TransformTreeNode node);
+    void leaveCompositeTransform(TransformHierarchy.Node node);
 
     /**
      * Called for each primitive transform after all of its topological 
predecessors
      * and inputs have been visited.
      */
-    void visitPrimitiveTransform(TransformTreeNode node);
+    void visitPrimitiveTransform(TransformHierarchy.Node node);
 
     /**
      * Called for each value after the transform that produced the value has 
been
      * visited.
      */
-    void visitValue(PValue value, TransformTreeNode producer);
+    void visitValue(PValue value, TransformHierarchy.Node producer);
 
     /**
      * Control enum for indicating whether or not a traversal should process 
the contents of
@@ -253,18 +252,18 @@ public class Pipeline {
      */
     class Defaults implements PipelineVisitor {
       @Override
-      public CompositeBehavior enterCompositeTransform(TransformTreeNode node) 
{
+      public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
         return CompositeBehavior.ENTER_TRANSFORM;
       }
 
       @Override
-      public void leaveCompositeTransform(TransformTreeNode node) { }
+      public void leaveCompositeTransform(TransformHierarchy.Node node) { }
 
       @Override
-      public void visitPrimitiveTransform(TransformTreeNode node) { }
+      public void visitPrimitiveTransform(TransformHierarchy.Node node) { }
 
       @Override
-      public void visitValue(PValue value, TransformTreeNode producer) { }
+      public void visitValue(PValue value, TransformHierarchy.Node producer) { 
}
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index d3fd497..662acc1 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -20,13 +20,18 @@ package org.apache.beam.sdk.runners;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -37,13 +42,13 @@ import org.apache.beam.sdk.values.PValue;
  * associated {@link PValue}s.
  */
 public class TransformHierarchy {
-  private final TransformTreeNode root;
-  private final Map<POutput, TransformTreeNode> producers;
+  private final Node root;
+  private final Map<POutput, Node> producers;
   // Maintain a stack based on the enclosing nodes
-  private TransformTreeNode current;
+  private Node current;
 
   public TransformHierarchy() {
-    root = TransformTreeNode.root(this);
+    root = new Node(null, null, "", null);
     current = root;
     producers = new HashMap<>();
   }
@@ -58,20 +63,22 @@ public class TransformHierarchy {
    *
    * @return the added node
    */
-  public TransformTreeNode pushNode(String name, PInput input, PTransform<?, 
?> transform) {
+  public Node pushNode(String name, PInput input, PTransform<?, ?> transform) {
     checkNotNull(
         transform, "A %s must be provided for all Nodes", 
PTransform.class.getSimpleName());
     checkNotNull(
         name, "A name must be provided for all %s Nodes", 
PTransform.class.getSimpleName());
     checkNotNull(
         input, "An input must be provided for all %s Nodes", 
PTransform.class.getSimpleName());
-    current = TransformTreeNode.subtransform(current, transform, name, input);
+    Node node = new Node(current, transform, name, input);
+    current.addComposite(node);
+    current = node;
     return current;
   }
 
   /**
    * Finish specifying all of the input {@link PValue PValues} of the current 
{@link
-   * TransformTreeNode}. Ensures that all of the inputs to the current node 
have been fully
+   * Node}. Ensures that all of the inputs to the current node have been fully
    * specified, and have been produced by a node in this graph.
    */
   public void finishSpecifyingInput() {
@@ -84,7 +91,7 @@ public class TransformHierarchy {
   }
 
   /**
-   * Set the output of the current {@link TransformTreeNode}. If the output is 
new (setOutput has
+   * Set the output of the current {@link Node}. If the output is new 
(setOutput has
    * not previously been called with it as the parameter), the current node is 
set as the producer
    * of that {@link POutput}.
    *
@@ -114,7 +121,7 @@ public class TransformHierarchy {
     checkState(current != null, "Can't pop the root node of a 
TransformHierarchy");
   }
 
-  TransformTreeNode getProducer(PValue produced) {
+  Node getProducer(PValue produced) {
     return producers.get(produced);
   }
 
@@ -122,10 +129,10 @@ public class TransformHierarchy {
    * Returns all producing transforms for the {@link PValue PValues} contained
    * in {@code output}.
    */
-  List<TransformTreeNode> getProducingTransforms(POutput output) {
-    List<TransformTreeNode> producingTransforms = new ArrayList<>();
+  List<Node> getProducingTransforms(POutput output) {
+    List<Node> producingTransforms = new ArrayList<>();
     for (PValue value : output.expand()) {
-      TransformTreeNode producer = getProducer(value);
+      Node producer = getProducer(value);
       if (producer != null) {
         producingTransforms.add(producer);
       }
@@ -139,7 +146,217 @@ public class TransformHierarchy {
     return visitedValues;
   }
 
-  public TransformTreeNode getCurrent() {
+  public Node getCurrent() {
     return current;
   }
+
+  /**
+   * Provides internal tracking of transform relationships with helper methods
+   * for initialization and ordered visitation.
+   */
+  public class Node {
+    private final Node enclosingNode;
+    // The PTransform for this node, which may be a composite PTransform.
+    // The root of a TransformHierarchy is represented as a Node
+    // with a null transform field.
+    private final PTransform<?, ?> transform;
+
+    private final String fullName;
+
+    // Nodes for sub-transforms of a composite transform.
+    private final Collection<Node> parts = new ArrayList<>();
+
+    // Input to the transform, in unexpanded form.
+    private final PInput input;
+
+    // TODO: track which outputs need to be exported to parent.
+    // Output of the transform, in unexpanded form.
+    private POutput output;
+
+    @VisibleForTesting
+    boolean finishedSpecifying = false;
+
+    /**
+     * Creates a new Node with the given parent and transform.
+     *
+     * <p>EnclosingNode and transform may both be null for a root-level node, 
which holds all other
+     * nodes.
+     *
+     * @param enclosingNode the composite node containing this node
+     * @param transform the PTransform tracked by this node
+     * @param fullName the fully qualified name of the transform
+     * @param input the unexpanded input to the transform
+     */
+    private Node(
+        @Nullable Node enclosingNode,
+        @Nullable PTransform<?, ?> transform,
+        String fullName,
+        @Nullable PInput input) {
+      this.enclosingNode = enclosingNode;
+      this.transform = transform;
+      this.fullName = fullName;
+      this.input = input;
+    }
+
+    /**
+     * Returns the transform associated with this transform node.
+     */
+    public PTransform<?, ?> getTransform() {
+      return transform;
+    }
+
+    /**
+     * Returns the enclosing composite transform node, or null if there is 
none.
+     */
+    public Node getEnclosingNode() {
+      return enclosingNode;
+    }
+
+    /**
+     * Adds a composite operation to the transform node.
+     *
+     * <p>As soon as a node is added, the transform node is considered a
+     * composite operation instead of a primitive transform.
+     */
+    public void addComposite(Node node) {
+      parts.add(node);
+    }
+
+    /**
+     * Returns true if this node represents a composite transform that does 
not perform processing
+     * of its own, but merely encapsulates a sub-pipeline (which may be empty).
+     *
+     * <p>Note that a node may be composite with no sub-transforms if it 
returns its input directly
+     * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
+     */
+    public boolean isCompositeNode() {
+      return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
+    }
+
+    private boolean returnsOthersOutput() {
+      PTransform<?, ?> transform = getTransform();
+      if (output != null) {
+        for (PValue outputValue : output.expand()) {
+          if (!getProducer(outputValue).getTransform().equals(transform)) {
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+
+    public boolean isRootNode() {
+      return transform == null;
+    }
+
+    public String getFullName() {
+      return fullName;
+    }
+
+    /**
+     * Returns the transform input, in unexpanded form.
+     */
+    public PInput getInput() {
+      return input;
+    }
+
+    /**
+     * Adds an output to the transform node.
+     */
+    public void setOutput(POutput output) {
+      checkState(!finishedSpecifying);
+      checkState(
+          this.output == null, "Tried to specify more than one output for %s", 
getFullName());
+      checkNotNull(output, "Tried to set the output of %s to null", 
getFullName());
+      this.output = output;
+
+      // Validate that a primitive transform produces only primitive output, 
and a composite
+      // transform does not produce primitive output.
+      Set<Node> outputProducers = new HashSet<>();
+      for (PValue outputValue : output.expand()) {
+        outputProducers.add(getProducer(outputValue));
+      }
+      if (outputProducers.contains(this) && outputProducers.size() != 1) {
+        Set<String> otherProducerNames = new HashSet<>();
+        for (Node outputProducer : outputProducers) {
+          if (outputProducer != this) {
+            otherProducerNames.add(outputProducer.getFullName());
+          }
+        }
+        throw new IllegalArgumentException(
+            String.format(
+                "Output of transform [%s] contains a %s produced by it as well 
as other "
+                    + "Transforms. A primitive transform must produce all of 
its outputs, and "
+                    + "outputs of a composite transform must be produced by a 
component transform "
+                    + "or be part of the input."
+                    + "%n    Other Outputs: %s"
+                    + "%n    Other Producers: %s",
+                getFullName(), POutput.class.getSimpleName(), output.expand(), 
otherProducerNames));
+      }
+    }
+
+    /** Returns the transform output, in unexpanded form. */
+    public POutput getOutput() {
+      return output;
+    }
+
+    AppliedPTransform<?, ?, ?> toAppliedPTransform() {
+      return AppliedPTransform.of(
+          getFullName(), getInput(), getOutput(), (PTransform) getTransform());
+    }
+    /**
+     * Visit the transform node.
+     *
+     * <p>Provides an ordered visit of the input values, the primitive 
transform (or child nodes for
+     * composite transforms), then the output values.
+     */
+    public void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
+      if (!finishedSpecifying) {
+        finishSpecifying();
+      }
+
+      if (!isRootNode()) {
+        // Visit inputs.
+        for (PValue inputValue : input.expand()) {
+          if (visitedValues.add(inputValue)) {
+            visitor.visitValue(inputValue, getProducer(inputValue));
+          }
+        }
+      }
+
+      if (isCompositeNode()) {
+        PipelineVisitor.CompositeBehavior recurse = 
visitor.enterCompositeTransform(this);
+
+        if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
+          for (Node child : parts) {
+            child.visit(visitor, visitedValues);
+          }
+        }
+        visitor.leaveCompositeTransform(this);
+      } else {
+        visitor.visitPrimitiveTransform(this);
+      }
+
+      if (!isRootNode()) {
+        // Visit outputs.
+        for (PValue pValue : output.expand()) {
+          if (visitedValues.add(pValue)) {
+            visitor.visitValue(pValue, this);
+          }
+        }
+      }
+    }
+
+    /**
+     * Finish specifying a transform.
+     *
+     * <p>All inputs are finished first, then the transform, then all outputs.
+     */
+    public void finishSpecifying() {
+      if (finishedSpecifying) {
+        return;
+      }
+      finishedSpecifying = true;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
deleted file mode 100644
index ea94bd9..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformTreeNode.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Provides internal tracking of transform relationships with helper methods
- * for initialization and ordered visitation.
- */
-public class TransformTreeNode {
-  private final TransformHierarchy hierarchy;
-  private final TransformTreeNode enclosingNode;
-
-  // The PTransform for this node, which may be a composite PTransform.
-  // The root of a TransformHierarchy is represented as a TransformTreeNode
-  // with a null transform field.
-  private final PTransform<?, ?> transform;
-
-  private final String fullName;
-
-  // Nodes for sub-transforms of a composite transform.
-  private final Collection<TransformTreeNode> parts = new ArrayList<>();
-
-  // Input to the transform, in unexpanded form.
-  private final PInput input;
-
-  // TODO: track which outputs need to be exported to parent.
-  // Output of the transform, in unexpanded form.
-  private POutput output;
-
-  @VisibleForTesting
-  boolean finishedSpecifying = false;
-
-  /**
-   * Create a root {@link TransformTreeNode}. This transform is the root of 
the provided {@link
-   * TransformHierarchy} - it has no enclosing node, no {@link PTransform}, no 
{@link PInput input},
-   * no {@link POutput output}, and an empty name. It contains all {@link 
PTransform transforms}
-   * within a {@link Pipeline} as component transforms.
-   */
-  public static TransformTreeNode root(TransformHierarchy hierarchy) {
-    return new TransformTreeNode(hierarchy, null, null, "", null);
-  }
-
-  /**
-   * Create a subtransform of the provided {@link TransformTreeNode node}. The 
enclosing node is a
-   * composite that contains this transform.
-   *
-   * <p>The returned node is a component node of the enclosing node.
-   */
-  public static TransformTreeNode subtransform(
-      TransformTreeNode enclosing, PTransform<?, ?> transform, String 
fullName, PInput input) {
-    checkNotNull(enclosing);
-    checkNotNull(transform);
-    checkNotNull(fullName);
-    checkNotNull(input);
-    TransformTreeNode node =
-        new TransformTreeNode(enclosing.hierarchy, enclosing, transform, 
fullName, input);
-    enclosing.addComposite(node);
-    return node;
-  }
-
-  /**
-   * Creates a new TransformTreeNode with the given parent and transform.
-   *
-   * <p>EnclosingNode and transform may both be null for a root-level node, 
which holds all other
-   * nodes.
-   *
-   * @param enclosingNode the composite node containing this node
-   * @param transform the PTransform tracked by this node
-   * @param fullName the fully qualified name of the transform
-   * @param input the unexpanded input to the transform
-   */
-  private TransformTreeNode(
-      TransformHierarchy hierarchy,
-      @Nullable TransformTreeNode enclosingNode,
-      @Nullable PTransform<?, ?> transform,
-      String fullName,
-      @Nullable PInput input) {
-    this.hierarchy = hierarchy;
-    this.enclosingNode = enclosingNode;
-    this.transform = transform;
-    this.fullName = fullName;
-    this.input = input;
-  }
-
-  /**
-   * Returns the transform associated with this transform node.
-   */
-  public PTransform<?, ?> getTransform() {
-    return transform;
-  }
-
-  /**
-   * Returns the enclosing composite transform node, or null if there is none.
-   */
-  public TransformTreeNode getEnclosingNode() {
-    return enclosingNode;
-  }
-
-  /**
-   * Adds a composite operation to the transform node.
-   *
-   * <p>As soon as a node is added, the transform node is considered a
-   * composite operation instead of a primitive transform.
-   */
-  public void addComposite(TransformTreeNode node) {
-    parts.add(node);
-  }
-
-  /**
-   * Returns true if this node represents a composite transform that does not 
perform processing of
-   * its own, but merely encapsulates a sub-pipeline (which may be empty).
-   *
-   * <p>Note that a node may be composite with no sub-transforms if it returns 
its input directly
-   * extracts a component of a tuple, or other operations that occur at 
pipeline assembly time.
-   */
-  public boolean isCompositeNode() {
-    return !parts.isEmpty() || isRootNode() || returnsOthersOutput();
-  }
-
-  private boolean returnsOthersOutput() {
-    PTransform<?, ?> transform = getTransform();
-    if (output != null) {
-      for (PValue outputValue : output.expand()) {
-        if 
(!hierarchy.getProducer(outputValue).getTransform().equals(transform)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  public boolean isRootNode() {
-    return transform == null;
-  }
-
-  public String getFullName() {
-    return fullName;
-  }
-
-  /**
-   * Returns the transform input, in unexpanded form.
-   */
-  public PInput getInput() {
-    return input;
-  }
-
-  /**
-   * Adds an output to the transform node.
-   */
-  public void setOutput(POutput output) {
-    checkState(!finishedSpecifying);
-    checkState(this.output == null, "Tried to specify more than one output for 
%s", getFullName());
-    checkNotNull(output, "Tried to set the output of %s to null", 
getFullName());
-    this.output = output;
-
-    // Validate that a primitive transform produces only primitive output, and 
a composite transform
-    // does not produce primitive output.
-    Set<TransformTreeNode> outputProducers = new HashSet<>();
-    for (PValue outputValue : output.expand()) {
-      outputProducers.add(hierarchy.getProducer(outputValue));
-    }
-    if (outputProducers.contains(this) && outputProducers.size() != 1) {
-      Set<String> otherProducerNames = new HashSet<>();
-      for (TransformTreeNode outputProducer : outputProducers) {
-        if (outputProducer != this) {
-          otherProducerNames.add(outputProducer.getFullName());
-        }
-      }
-      throw new IllegalArgumentException(
-          String.format(
-              "Output of transform [%s] contains a %s produced by it as well 
as other Transforms. "
-                  + "A primitive transform must produce all of its outputs, 
and outputs of a "
-                  + "composite transform must be produced by a component 
transform or be part of"
-                  + "the input."
-                  + "%n    Other Outputs: %s"
-                  + "%n    Other Producers: %s",
-              getFullName(), POutput.class.getSimpleName(), output.expand(), 
otherProducerNames));
-    }
-  }
-
-  /**
-   * Returns the transform output, in unexpanded form.
-   */
-  public POutput getOutput() {
-    return output;
-  }
-
-  AppliedPTransform<?, ?, ?> toAppliedPTransform() {
-    return AppliedPTransform.of(
-        getFullName(), getInput(), getOutput(), (PTransform) getTransform());
-  }
-  /**
-   * Visit the transform node.
-   *
-   * <p>Provides an ordered visit of the input values, the primitive
-   * transform (or child nodes for composite transforms), then the
-   * output values.
-   */
-  public void visit(PipelineVisitor visitor,
-                    Set<PValue> visitedValues) {
-    if (!finishedSpecifying) {
-      finishSpecifying();
-    }
-
-    if (!isRootNode()) {
-      // Visit inputs.
-      for (PValue inputValue : input.expand()) {
-        if (visitedValues.add(inputValue)) {
-          visitor.visitValue(inputValue, hierarchy.getProducer(inputValue));
-        }
-      }
-    }
-
-    if (isCompositeNode()) {
-      PipelineVisitor.CompositeBehavior recurse = 
visitor.enterCompositeTransform(this);
-
-      if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
-        for (TransformTreeNode child : parts) {
-          child.visit(visitor, visitedValues);
-        }
-      }
-      visitor.leaveCompositeTransform(this);
-    } else {
-      visitor.visitPrimitiveTransform(this);
-    }
-
-    if (!isRootNode()) {
-      // Visit outputs.
-      for (PValue pValue : output.expand()) {
-        if (visitedValues.add(pValue)) {
-          visitor.visitValue(pValue, this);
-        }
-      }
-    }
-  }
-
-  /**
-   * Finish specifying a transform.
-   *
-   * <p>All inputs are finished first, then the transform, then
-   * all outputs.
-   */
-  public void finishSpecifying() {
-    if (finishedSpecifying) {
-      return;
-    }
-    finishedSpecifying = true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
index 335d81f..b4de768 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Max;
@@ -73,7 +73,7 @@ public class AggregatorPipelineExtractorTest {
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new 
Sum.SumLongFn());
     Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(new 
Min.MinIntegerFn());
 
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    TransformHierarchy.Node transformNode = 
mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
 
     doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
@@ -101,7 +101,7 @@ public class AggregatorPipelineExtractorTest {
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new 
Max.MaxLongFn());
     Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new 
Min.MinDoubleFn());
 
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    TransformHierarchy.Node transformNode = 
mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
 
     doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode)))
@@ -132,9 +132,9 @@ public class AggregatorPipelineExtractorTest {
     Aggregator<Long, Long> aggregatorOne = fn.addAggregator(new 
Sum.SumLongFn());
     Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(new 
Min.MinDoubleFn());
 
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    TransformHierarchy.Node transformNode = 
mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
-    TransformTreeNode otherTransformNode = mock(TransformTreeNode.class);
+    TransformHierarchy.Node otherTransformNode = 
mock(TransformHierarchy.Node.class);
     when(otherTransformNode.getTransform()).thenReturn(otherBound);
 
     doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, 
otherTransformNode)))
@@ -172,9 +172,9 @@ public class AggregatorPipelineExtractorTest {
 
     when(otherBound.getFn()).thenReturn(otherFn);
 
-    TransformTreeNode transformNode = mock(TransformTreeNode.class);
+    TransformHierarchy.Node transformNode = 
mock(TransformHierarchy.Node.class);
     when(transformNode.getTransform()).thenReturn(bound);
-    TransformTreeNode otherTransformNode = mock(TransformTreeNode.class);
+    TransformHierarchy.Node otherTransformNode = 
mock(TransformHierarchy.Node.class);
     when(otherTransformNode.getTransform()).thenReturn(otherBound);
 
     doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, 
otherTransformNode)))
@@ -192,16 +192,16 @@ public class AggregatorPipelineExtractorTest {
   }
 
   private static class VisitNodesAnswer implements Answer<Object> {
-    private final List<TransformTreeNode> nodes;
+    private final List<TransformHierarchy.Node> nodes;
 
-    public VisitNodesAnswer(List<TransformTreeNode> nodes) {
+    public VisitNodesAnswer(List<TransformHierarchy.Node> nodes) {
       this.nodes = nodes;
     }
 
     @Override
     public Object answer(InvocationOnMock invocation) throws Throwable {
       PipelineVisitor visitor = (PipelineVisitor) invocation.getArguments()[0];
-      for (TransformTreeNode node : nodes) {
+      for (TransformHierarchy.Node node : nodes) {
         visitor.visitPrimitiveTransform(node);
       }
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 3bf6d64..f4488f4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -77,8 +77,8 @@ public class TransformHierarchyTest {
 
   @Test
   public void pushThenPopSucceeds() {
-    TransformTreeNode root = hierarchy.getCurrent();
-    TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), 
Create.of(1));
+    TransformHierarchy.Node root = hierarchy.getCurrent();
+    TransformHierarchy.Node node = hierarchy.pushNode("Create", 
PBegin.in(pipeline), Create.of(1));
     assertThat(hierarchy.getCurrent(), equalTo(node));
     hierarchy.popNode();
     assertThat(node.finishedSpecifying, is(true));
@@ -90,12 +90,12 @@ public class TransformHierarchyTest {
     PCollection<Long> created =
         PCollection.createPrimitiveOutputInternal(
             pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-    TransformTreeNode node = hierarchy.pushNode("Create", PBegin.in(pipeline), 
Create.of(1));
+    TransformHierarchy.Node node = hierarchy.pushNode("Create", 
PBegin.in(pipeline), Create.of(1));
     hierarchy.setOutput(created);
     hierarchy.popNode();
     PCollectionList<Long> pcList = PCollectionList.of(created);
 
-    TransformTreeNode emptyTransform =
+    TransformHierarchy.Node emptyTransform =
         hierarchy.pushNode(
             "Extract",
             pcList,
@@ -149,7 +149,7 @@ public class TransformHierarchyTest {
 
   @Test
   public void visitVisitsAllPushed() {
-    TransformTreeNode root = hierarchy.getCurrent();
+    TransformHierarchy.Node root = hierarchy.getCurrent();
     PBegin begin = PBegin.in(pipeline);
 
     Create.Values<Long> create = Create.of(1L);
@@ -170,7 +170,7 @@ public class TransformHierarchyTest {
         PCollection.createPrimitiveOutputInternal(
             pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
 
-    TransformTreeNode compositeNode = hierarchy.pushNode("Create", begin, 
create);
+    TransformHierarchy.Node compositeNode = hierarchy.pushNode("Create", 
begin, create);
     assertThat(hierarchy.getCurrent(), equalTo(compositeNode));
     assertThat(compositeNode.getInput(), Matchers.<PInput>equalTo(begin));
     assertThat(compositeNode.getTransform(), Matchers.<PTransform<?, 
?>>equalTo(create));
@@ -178,7 +178,7 @@ public class TransformHierarchyTest {
     assertThat(compositeNode.getOutput(), nullValue());
     assertThat(compositeNode.getEnclosingNode().isRootNode(), is(true));
 
-    TransformTreeNode primitiveNode = hierarchy.pushNode("Create/Read", begin, 
read);
+    TransformHierarchy.Node primitiveNode = hierarchy.pushNode("Create/Read", 
begin, read);
     assertThat(hierarchy.getCurrent(), equalTo(primitiveNode));
     hierarchy.setOutput(created);
     hierarchy.popNode();
@@ -194,30 +194,30 @@ public class TransformHierarchyTest {
     assertThat(hierarchy.getProducer(created), equalTo(primitiveNode));
     hierarchy.popNode();
 
-    TransformTreeNode otherPrimitive = hierarchy.pushNode("ParDo", created, 
map);
+    TransformHierarchy.Node otherPrimitive = hierarchy.pushNode("ParDo", 
created, map);
     hierarchy.setOutput(mapped);
     hierarchy.popNode();
 
-    final Set<TransformTreeNode> visitedCompositeNodes = new HashSet<>();
-    final Set<TransformTreeNode> visitedPrimitiveNodes = new HashSet<>();
+    final Set<TransformHierarchy.Node> visitedCompositeNodes = new HashSet<>();
+    final Set<TransformHierarchy.Node> visitedPrimitiveNodes = new HashSet<>();
     final Set<PValue> visitedValuesInVisitor = new HashSet<>();
 
     Set<PValue> visitedValues =
         hierarchy.visit(
             new PipelineVisitor.Defaults() {
               @Override
-              public CompositeBehavior 
enterCompositeTransform(TransformTreeNode node) {
+              public CompositeBehavior 
enterCompositeTransform(TransformHierarchy.Node node) {
                 visitedCompositeNodes.add(node);
                 return CompositeBehavior.ENTER_TRANSFORM;
               }
 
               @Override
-              public void visitPrimitiveTransform(TransformTreeNode node) {
+              public void visitPrimitiveTransform(TransformHierarchy.Node 
node) {
                 visitedPrimitiveNodes.add(node);
               }
 
               @Override
-              public void visitValue(PValue value, TransformTreeNode producer) 
{
+              public void visitValue(PValue value, TransformHierarchy.Node 
producer) {
                 visitedValuesInVisitor.add(value);
               }
             });

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index b95fa70..a81fb1a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -51,7 +51,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 /**
- * Tests for {@link TransformTreeNode} and {@link TransformHierarchy}.
+ * Tests for {@link TransformHierarchy.Node} and {@link TransformHierarchy}.
  */
 @RunWith(JUnit4.class)
 public class TransformTreeTest {
@@ -128,7 +128,7 @@ public class TransformTreeTest {
 
     p.traverseTopologically(new Pipeline.PipelineVisitor.Defaults() {
       @Override
-      public CompositeBehavior enterCompositeTransform(TransformTreeNode node) 
{
+      public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
         PTransform<?, ?> transform = node.getTransform();
         if (transform instanceof Sample.SampleAny) {
           assertTrue(visited.add(TransformsSeen.SAMPLE_ANY));
@@ -144,7 +144,7 @@ public class TransformTreeTest {
       }
 
       @Override
-      public void leaveCompositeTransform(TransformTreeNode node) {
+      public void leaveCompositeTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
         if (transform instanceof Sample.SampleAny) {
           assertTrue(left.add(TransformsSeen.SAMPLE_ANY));
@@ -152,7 +152,7 @@ public class TransformTreeTest {
       }
 
       @Override
-      public void visitPrimitiveTransform(TransformTreeNode node) {
+      public void visitPrimitiveTransform(TransformHierarchy.Node node) {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
         assertThat(transform, not(instanceOf(Sample.SampleAny.class)));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/569e8d70/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
index b758ed6..31ac913 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluator.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -143,7 +143,7 @@ public class DisplayDataEvaluator {
     }
 
     @Override
-    public CompositeBehavior enterCompositeTransform(TransformTreeNode node) {
+    public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node 
node) {
       if (Objects.equals(root, node.getTransform())) {
         inCompositeRoot = true;
       }
@@ -152,14 +152,14 @@ public class DisplayDataEvaluator {
     }
 
     @Override
-    public void leaveCompositeTransform(TransformTreeNode node) {
+    public void leaveCompositeTransform(TransformHierarchy.Node node) {
       if (Objects.equals(root, node.getTransform())) {
         inCompositeRoot = false;
       }
     }
 
     @Override
-    public void visitPrimitiveTransform(TransformTreeNode node) {
+    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
       if (inCompositeRoot || Objects.equals(root, node.getTransform())) {
         displayData.add(DisplayData.from(node.getTransform()));
       }

Reply via email to