taegeonum closed pull request #145: [Nemo-220] Move PIPELINE variable to 
TranslationContext in PipelineTranslator
URL: https://github.com/apache/incubator-nemo/pull/145
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index ee10b21f4..3fd9d2be7 100644
--- 
a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ 
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -55,7 +55,6 @@
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -66,8 +65,7 @@
  * For a {@link CompositeTransformVertex}, it defines how to setup and clear 
{@link TranslationContext}
  * before start translating inner Beam transform hierarchy.
  */
-public final class PipelineTranslator
-  implements BiFunction<CompositeTransformVertex, PipelineOptions, 
DAG<IRVertex, IREdge>> {
+public final class PipelineTranslator {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTranslator.class.getName());
 
@@ -76,9 +74,6 @@
   private final Map<Class<? extends PTransform>, Method> 
primitiveTransformToTranslator = new HashMap<>();
   private final Map<Class<? extends PTransform>, Method> 
compositeTransformToTranslator = new HashMap<>();
 
-  // TODO #220: Move this variable to TranslationContext
-  private static final AtomicReference<Pipeline> PIPELINE = new 
AtomicReference<>();
-
   /**
    * Static translator method.
    * @param pipeline the original root
@@ -89,8 +84,7 @@
   public static DAG<IRVertex, IREdge> translate(final Pipeline pipeline,
                                                 final CompositeTransformVertex 
root,
                                                 final PipelineOptions 
pipelineOptions) {
-    PIPELINE.set(pipeline);
-    return INSTANCE.apply(root, pipelineOptions);
+    return INSTANCE.translateToIRDAG(root, pipeline, pipelineOptions);
   }
 
   /**
@@ -144,7 +138,7 @@ private static void boundedReadTranslator(final 
TranslationContext ctx,
   private static DoFnTransform createDoFnTransform(final TranslationContext 
ctx,
                                                    final 
PrimitiveTransformVertex transformVertex) {
     try {
-      final AppliedPTransform pTransform = 
transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+      final AppliedPTransform pTransform = 
transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
       final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
       final TupleTag mainOutputTag = 
ParDoTranslation.getMainOutputTag(pTransform);
       final List<PCollectionView<?>> sideInputs = 
ParDoTranslation.getSideInputs(pTransform);
@@ -221,7 +215,7 @@ private static void parDoMultiOutputTranslator(final 
TranslationContext ctx,
   private static Transform createGBKTransform(
     final TranslationContext ctx,
     final TransformVertex transformVertex) {
-    final AppliedPTransform pTransform = 
transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+    final AppliedPTransform pTransform = 
transformVertex.getNode().toAppliedPTransform(ctx.pipeline);
     final PCollection<?> mainInput = (PCollection<?>)
       
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
     final TupleTag mainOutputTag = new TupleTag<>();
@@ -372,12 +366,12 @@ private static void loopTranslator(final 
TranslationContext ctx,
     ctx.loopVertexStack.pop();
   }
 
-  @Override
-  public DAG<IRVertex, IREdge> apply(final CompositeTransformVertex pipeline,
-                                     final PipelineOptions pipelineOptions) {
-    final TranslationContext ctx = new TranslationContext(pipeline, 
primitiveTransformToTranslator,
+  private DAG<IRVertex, IREdge> translateToIRDAG(final 
CompositeTransformVertex vertex,
+                                                 final Pipeline pipeline,
+                                                 final PipelineOptions 
pipelineOptions) {
+    final TranslationContext ctx = new TranslationContext(vertex, pipeline, 
primitiveTransformToTranslator,
         compositeTransformToTranslator, 
DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
-    ctx.translate(pipeline);
+    ctx.translate(vertex);
     return ctx.builder.build();
   }
 
@@ -453,23 +447,28 @@ private static void loopTranslator(final 
TranslationContext ctx,
     private final Map<PValue, TupleTag<?>> pValueToTag;
     private final Stack<LoopVertex> loopVertexStack;
     private final BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> communicationPatternSelector;
+    private final Pipeline pipeline;
+
 
     private final Map<Class<? extends PTransform>, Method> 
primitiveTransformToTranslator;
     private final Map<Class<? extends PTransform>, Method> 
compositeTransformToTranslator;
 
     /**
      * @param root the root to translate
+     * @param pipeline the pipeline to translate
      * @param primitiveTransformToTranslator provides translators for 
PrimitiveTransform
      * @param compositeTransformToTranslator provides translators for 
CompositeTransform
      * @param selector provides {@link CommunicationPatternProperty.Value} for 
IR edges
      * @param pipelineOptions {@link PipelineOptions}
      */
     private TranslationContext(final CompositeTransformVertex root,
+                               final Pipeline pipeline,
                                final Map<Class<? extends PTransform>, Method> 
primitiveTransformToTranslator,
                                final Map<Class<? extends PTransform>, Method> 
compositeTransformToTranslator,
                                final BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> selector,
                                final PipelineOptions pipelineOptions) {
       this.root = root;
+      this.pipeline = pipeline;
       this.builder = new DAGBuilder<>();
       this.pValueToProducer = new HashMap<>();
       this.pValueToTag = new HashMap<>();
@@ -489,6 +488,7 @@ private TranslationContext(final CompositeTransformVertex 
root,
     private TranslationContext(final TranslationContext ctx,
                                final BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> selector) {
       this.root = ctx.root;
+      this.pipeline = ctx.pipeline;
       this.pipelineOptions = ctx.pipelineOptions;
       this.builder = ctx.builder;
       this.pValueToProducer = ctx.pValueToProducer;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to