taegeonum commented on a change in pull request #150: [NEMO-269] Direct 
translation from Beam DAG to Nemo DAG
URL: https://github.com/apache/incubator-nemo/pull/150#discussion_r231004276
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
 ##########
 @@ -258,429 +237,139 @@ private static void windowTranslator(final 
TranslationContext ctx,
     }
     final IRVertex vertex = new OperatorVertex(new 
WindowFnTransform(windowFn));
     ctx.addVertex(vertex);
-    transformVertex.getNode().getInputs().values().forEach(input -> 
ctx.addEdgeTo(vertex, input));
-    transformVertex.getNode().getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(vertex, output));
+    beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, 
input));
+    beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, vertex, output));
   }
 
   @PrimitiveTransformTranslator(View.CreatePCollectionView.class)
-  private static void createPCollectionViewTranslator(final TranslationContext 
ctx,
-                                                      final 
PrimitiveTransformVertex transformVertex,
+  private static void createPCollectionViewTranslator(final 
PipelineTranslationContext ctx,
+                                                      final 
TransformHierarchy.Node beamNode,
                                                       final 
View.CreatePCollectionView<?, ?> transform) {
     final IRVertex vertex = new OperatorVertex(new 
CreateViewTransform(transform.getView().getViewFn()));
     ctx.addVertex(vertex);
-    transformVertex.getNode().getInputs().values().forEach(input -> 
ctx.addEdgeTo(vertex, input));
-    ctx.registerMainOutputFrom(vertex, transform.getView());
-    transformVertex.getNode().getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(vertex, output));
+    beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, 
input));
+    ctx.registerMainOutputFrom(beamNode, vertex, transform.getView());
+    beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, vertex, output));
   }
 
   @PrimitiveTransformTranslator(Flatten.PCollections.class)
-  private static void flattenTranslator(final TranslationContext ctx,
-                                        final PrimitiveTransformVertex 
transformVertex,
+  private static void flattenTranslator(final PipelineTranslationContext ctx,
+                                        final TransformHierarchy.Node beamNode,
                                         final Flatten.PCollections<?> 
transform) {
     final IRVertex vertex = new OperatorVertex(new FlattenTransform());
     ctx.addVertex(vertex);
-    transformVertex.getNode().getInputs().values().forEach(input -> 
ctx.addEdgeTo(vertex, input));
-    transformVertex.getNode().getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(vertex, output));
+    beamNode.getInputs().values().forEach(input -> ctx.addEdgeTo(vertex, 
input));
+    beamNode.getOutputs().values().forEach(output -> 
ctx.registerMainOutputFrom(beamNode, vertex, output));
   }
 
-  /**
-   * Default translator for CompositeTransforms. Translates inner DAG without 
modifying {@link TranslationContext}.
-   *
-   * @param ctx provides translation context
-   * @param transformVertex the given CompositeTransform to translate
-   * @param transform transform which can be obtained from {@code 
transformVertex}
-   */
-  @CompositeTransformTranslator(PTransform.class)
-  private static void topologicalTranslator(final TranslationContext ctx,
-                                            final CompositeTransformVertex 
transformVertex,
-                                            final PTransform<?, ?> transform) {
-    transformVertex.getDAG().topologicalDo(ctx::translate);
-  }
+  
////////////////////////////////////////////////////////////////////////////////////////////////////////
+  /////////////////////// COMPOSITE TRANSFORMS
 
   /**
-   * Translator for Combine transform. Implements local combining before 
shuffling key-value pairs.
+   * {@link Combine.PerKey} = {@link GroupByKey} + {@link 
Combine.GroupedValues}
+   * ({@link Combine.Globally} internally uses {@link Combine.PerKey} which 
will also be optimized by this translator)
+   * Here, we translate this composite transform as a whole, exploiting its 
accumulator semantics.
    *
    * @param ctx provides translation context
-   * @param transformVertex the given CompositeTransform to translate
-   * @param transform transform which can be obtained from {@code 
transformVertex}
+   * @param beamNode the given CompositeTransform to translate
+   * @param transform transform which can be obtained from {@code beamNode}
    */
-  @CompositeTransformTranslator({Combine.Globally.class, Combine.PerKey.class, 
Combine.GroupedValues.class})
-  private static void combineTranslator(final TranslationContext ctx,
-                                        final CompositeTransformVertex 
transformVertex,
-                                        final PTransform<?, ?> transform) {
-    // No optimization for BeamSQL that handles Beam 'Row's.
-    final boolean handlesBeamRow = Stream
-      .concat(transformVertex.getNode().getInputs().values().stream(),
-        transformVertex.getNode().getOutputs().values().stream())
-      .map(pValue -> (KvCoder) getCoder(pValue, ctx.root)) // Input and output 
of combine should be KV
-      .map(kvCoder -> kvCoder.getValueCoder().getEncodedTypeDescriptor()) // 
We're interested in the 'Value' of KV
-      .anyMatch(valueTypeDescriptor -> 
TypeDescriptor.of(Row.class).equals(valueTypeDescriptor));
-    if (handlesBeamRow) {
-      transformVertex.getDAG().topologicalDo(ctx::translate);
-      return; // return early and give up optimization - TODO #209: Enable 
Local Combiner for BeamSQL
-    }
-
-    // Local combiner optimization
-    final List<TransformVertex> topologicalOrdering = 
transformVertex.getDAG().getTopologicalSort();
-    final TransformVertex groupByKeyBeamTransform = topologicalOrdering.get(0);
-    final TransformVertex last = 
topologicalOrdering.get(topologicalOrdering.size() - 1);
-    if (groupByKeyBeamTransform.getNode().getTransform() instanceof 
GroupByKey) {
-      // Translate the given CompositeTransform under OneToOneEdge-enforced 
context.
-      final TranslationContext oneToOneEdgeContext = new 
TranslationContext(ctx,
-          OneToOneCommunicationPatternSelector.INSTANCE);
-      transformVertex.getDAG().topologicalDo(oneToOneEdgeContext::translate);
-
-      // Attempt to translate the CompositeTransform again.
-      // Add GroupByKey, which is the first transform in the given 
CompositeTransform.
-      // Make sure it consumes the output from the last vertex in 
OneToOneEdge-translated hierarchy.
-      final IRVertex groupByKeyIRVertex = new 
OperatorVertex(createGBKTransform(ctx, transformVertex));
-      ctx.addVertex(groupByKeyIRVertex);
-      last.getNode().getOutputs().values().forEach(outputFromCombiner
-          -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
-      groupByKeyBeamTransform.getNode().getOutputs().values()
-          .forEach(outputFromGroupByKey -> 
ctx.registerMainOutputFrom(groupByKeyIRVertex, outputFromGroupByKey));
-
-      // Translate the remaining vertices.
-      topologicalOrdering.stream().skip(1).forEach(ctx::translate);
-    } else {
-      transformVertex.getDAG().topologicalDo(ctx::translate);
-    }
+  @CompositeTransformTranslator(Combine.PerKey.class)
+  private static Pipeline.PipelineVisitor.CompositeBehavior 
combinePerKeyTranslator(
+    final PipelineTranslationContext ctx,
+    final TransformHierarchy.Node beamNode,
+    final PTransform<?, ?> transform) {
+    // TODO #260: Beam Accumulator-based Partial Aggregation
+    return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
   }
 
   /**
-   * Pushes the loop vertex to the stack before translating the inner DAG, and 
pops it after the translation.
-   *
    * @param ctx provides translation context
-   * @param transformVertex the given CompositeTransform to translate
-   * @param transform transform which can be obtained from {@code 
transformVertex}
+   * @param beamNode the given CompositeTransform to translate
+   * @param transform transform which can be obtained from {@code beamNode}
+   * @
    */
   @CompositeTransformTranslator(LoopCompositeTransform.class)
-  private static void loopTranslator(final TranslationContext ctx,
-                                     final CompositeTransformVertex 
transformVertex,
-                                     final LoopCompositeTransform<?, ?> 
transform) {
-    final LoopVertex loopVertex = new 
LoopVertex(transformVertex.getNode().getFullName());
-    ctx.builder.addVertex(loopVertex, ctx.loopVertexStack);
-    ctx.builder.removeVertex(loopVertex);
-    ctx.loopVertexStack.push(loopVertex);
-    topologicalTranslator(ctx, transformVertex, transform);
-    ctx.loopVertexStack.pop();
+  private static Pipeline.PipelineVisitor.CompositeBehavior loopTranslator(
+    final PipelineTranslationContext ctx,
+    final TransformHierarchy.Node beamNode,
+    final LoopCompositeTransform<?, ?> transform) {
+    // Do nothing here, as the context handles the loop vertex stack.
+    // We just keep this method to signal that the loop vertex is acknowledged.
+    return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
   }
 
-  private DAG<IRVertex, IREdge> translateToIRDAG(final 
CompositeTransformVertex vertex,
-                                                 final Pipeline pipeline,
-                                                 final PipelineOptions 
pipelineOptions) {
-    final TranslationContext ctx = new TranslationContext(vertex, pipeline, 
primitiveTransformToTranslator,
-        compositeTransformToTranslator, 
DefaultCommunicationPatternSelector.INSTANCE, pipelineOptions);
-    ctx.translate(vertex);
-    return ctx.builder.build();
-  }
+  
////////////////////////////////////////////////////////////////////////////////////////////////////////
+  /////////////////////// HELPER METHODS
 
-  /**
-   * Annotates translator for PrimitiveTransform.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  private @interface PrimitiveTransformTranslator {
-    Class<? extends PTransform>[] value();
-  }
-
-  /**
-   * Annotates translator for CompositeTransform.
-   */
-  @Target(ElementType.METHOD)
-  @Retention(RetentionPolicy.RUNTIME)
-  private @interface CompositeTransformTranslator {
-    Class<? extends PTransform>[] value();
-  }
+  private static DoFnTransform createDoFnTransform(final 
PipelineTranslationContext ctx,
+                                                   final 
TransformHierarchy.Node beamNode) {
+    try {
+      final AppliedPTransform pTransform = 
beamNode.toAppliedPTransform(ctx.getPipeline());
+      final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
+      final TupleTag mainOutputTag = 
ParDoTranslation.getMainOutputTag(pTransform);
+      final List<PCollectionView<?>> sideInputs = 
ParDoTranslation.getSideInputs(pTransform);
+      final TupleTagList additionalOutputTags = 
ParDoTranslation.getAdditionalOutputTags(pTransform);
 
-  private static Coder<?> getCoder(final PValue input, final 
CompositeTransformVertex pipeline) {
-    final Coder<?> coder;
-    if (input instanceof PCollection) {
-      coder = ((PCollection) input).getCoder();
-    } else if (input instanceof PCollectionView) {
-      coder = getCoderForView((PCollectionView) input, pipeline);
-    } else {
-      throw new RuntimeException(String.format("Coder for PValue %s cannot be 
determined", input));
-    }
-    return coder;
-  }
+      final PCollection<?> mainInput = (PCollection<?>)
+        
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
 
-  /**
-   * Get appropriate coder for {@link PCollectionView}.
-   *
-   * @param view {@link PCollectionView} from the corresponding {@link 
View.CreatePCollectionView} transform
-   * @return appropriate {@link Coder} for {@link PCollectionView}
-   */
-  private static Coder<?> getCoderForView(final PCollectionView view, final 
CompositeTransformVertex pipeline) {
-    final PrimitiveTransformVertex src = pipeline.getPrimitiveProducerOf(view);
-    final Coder<?> baseCoder = src.getNode().getOutputs().values().stream()
-      .filter(v -> v instanceof PCollection)
-      .map(v -> (PCollection) v)
-      .findFirst()
-      .orElseThrow(() -> new RuntimeException(String.format("No incoming 
PCollection to %s", src)))
-      .getCoder();
-    final KvCoder<?, ?> inputKVCoder = (KvCoder) baseCoder;
-    final ViewFn viewFn = view.getViewFn();
-    if (viewFn instanceof PCollectionViews.IterableViewFn) {
-      return IterableCoder.of(inputKVCoder.getValueCoder());
-    } else if (viewFn instanceof PCollectionViews.ListViewFn) {
-      return ListCoder.of(inputKVCoder.getValueCoder());
-    } else if (viewFn instanceof PCollectionViews.MapViewFn) {
-      return MapCoder.of(inputKVCoder.getKeyCoder(), 
inputKVCoder.getValueCoder());
-    } else if (viewFn instanceof PCollectionViews.MultimapViewFn) {
-      return MapCoder.of(inputKVCoder.getKeyCoder(), 
IterableCoder.of(inputKVCoder.getValueCoder()));
-    } else if (viewFn instanceof PCollectionViews.SingletonViewFn) {
-      return baseCoder;
-    } else {
-      throw new UnsupportedOperationException(String.format("Unsupported 
viewFn %s", viewFn.getClass()));
+      return new DoFnTransform(
+        doFn,
+        mainInput.getCoder(),
+        getOutputCoders(pTransform),
+        mainOutputTag,
+        additionalOutputTags.getAll(),
+        mainInput.getWindowingStrategy(),
+        sideInputs,
+        ctx.getPipelineOptions());
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
     }
   }
 
-  /**
-   * Translation context.
-   */
-  private static final class TranslationContext {
-    private final CompositeTransformVertex root;
-    private final PipelineOptions pipelineOptions;
-    private final DAGBuilder<IRVertex, IREdge> builder;
-    private final Map<PValue, IRVertex> pValueToProducer;
-    private final Map<PValue, TupleTag<?>> pValueToTag;
-    private final Stack<LoopVertex> loopVertexStack;
-    private final BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> communicationPatternSelector;
-    private final Pipeline pipeline;
-
-
-    private final Map<Class<? extends PTransform>, Method> 
primitiveTransformToTranslator;
-    private final Map<Class<? extends PTransform>, Method> 
compositeTransformToTranslator;
-
-    /**
-     * @param root the root to translate
-     * @param pipeline the pipeline to translate
-     * @param primitiveTransformToTranslator provides translators for 
PrimitiveTransform
-     * @param compositeTransformToTranslator provides translators for 
CompositeTransform
-     * @param selector provides {@link CommunicationPatternProperty.Value} for 
IR edges
-     * @param pipelineOptions {@link PipelineOptions}
-     */
-    private TranslationContext(final CompositeTransformVertex root,
-                               final Pipeline pipeline,
-                               final Map<Class<? extends PTransform>, Method> 
primitiveTransformToTranslator,
-                               final Map<Class<? extends PTransform>, Method> 
compositeTransformToTranslator,
-                               final BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> selector,
-                               final PipelineOptions pipelineOptions) {
-      this.root = root;
-      this.pipeline = pipeline;
-      this.builder = new DAGBuilder<>();
-      this.pValueToProducer = new HashMap<>();
-      this.pValueToTag = new HashMap<>();
-      this.loopVertexStack = new Stack<>();
-      this.primitiveTransformToTranslator = primitiveTransformToTranslator;
-      this.compositeTransformToTranslator = compositeTransformToTranslator;
-      this.communicationPatternSelector = selector;
-      this.pipelineOptions = pipelineOptions;
-    }
-
-    /**
-     * Copy constructor, except for setting different 
CommunicationPatternProperty selector.
-     *
-     * @param ctx the original {@link TranslationContext}
-     * @param selector provides {@link CommunicationPatternProperty.Value} for 
IR edges
-     */
-    private TranslationContext(final TranslationContext ctx,
-                               final BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> selector) {
-      this.root = ctx.root;
-      this.pipeline = ctx.pipeline;
-      this.pipelineOptions = ctx.pipelineOptions;
-      this.builder = ctx.builder;
-      this.pValueToProducer = ctx.pValueToProducer;
-      this.pValueToTag = ctx.pValueToTag;
-      this.loopVertexStack = ctx.loopVertexStack;
-      this.primitiveTransformToTranslator = ctx.primitiveTransformToTranslator;
-      this.compositeTransformToTranslator = ctx.compositeTransformToTranslator;
-
-      this.communicationPatternSelector = selector;
-    }
-
-    /**
-     * Selects appropriate translator to translate the given hierarchy.
-     *
-     * @param transformVertex the Beam transform hierarchy to translate
-     */
-    private void translate(final TransformVertex transformVertex) {
-      final boolean isComposite = transformVertex instanceof 
CompositeTransformVertex;
-      final PTransform<?, ?> transform = 
transformVertex.getNode().getTransform();
-      if (transform == null) {
-        // root node
-        topologicalTranslator(this, (CompositeTransformVertex) 
transformVertex, null);
-        return;
-      }
-
-      Class<?> clazz = transform.getClass();
-      while (true) {
-        final Method translator = (isComposite ? 
compositeTransformToTranslator : primitiveTransformToTranslator)
-            .get(clazz);
-        if (translator == null) {
-          if (clazz.getSuperclass() != null) {
-            clazz = clazz.getSuperclass();
-            continue;
-          }
-          throw new UnsupportedOperationException(String.format("%s transform 
%s is not supported",
-              isComposite ? "Composite" : "Primitive", 
transform.getClass().getCanonicalName()));
-        } else {
-          try {
-            translator.setAccessible(true);
-            translator.invoke(null, this, transformVertex, transform);
-            break;
-          } catch (final IllegalAccessException e) {
-            throw new RuntimeException(e);
-          } catch (final InvocationTargetException | RuntimeException e) {
-            throw new RuntimeException(String.format(
-                "Translator %s have failed to translate %s", translator, 
transform), e);
-          }
-        }
-      }
-    }
-
-    /**
-     * Add IR vertex to the builder.
-     *
-     * @param vertex IR vertex to add
-     */
-    private void addVertex(final IRVertex vertex) {
-      builder.addVertex(vertex, loopVertexStack);
-    }
-
-    /**
-     * Add IR edge to the builder.
-     *
-     * @param dst the destination IR vertex.
-     * @param input the {@link PValue} {@code dst} consumes
-     */
-    private void addEdgeTo(final IRVertex dst, final PValue input) {
-      final IRVertex src = pValueToProducer.get(input);
-      if (src == null) {
-        try {
-          throw new RuntimeException(String.format("Cannot find a vertex that 
emits pValue %s, "
-              + "while PTransform %s is known to produce it.", input, 
root.getPrimitiveProducerOf(input)));
-        } catch (final RuntimeException e) {
-          throw new RuntimeException(String.format("Cannot find a vertex that 
emits pValue %s, "
-              + "and the corresponding PTransform was not found", input));
-        }
-      }
-      final CommunicationPatternProperty.Value communicationPattern = 
communicationPatternSelector.apply(src, dst);
-      if (communicationPattern == null) {
-        throw new RuntimeException(String.format("%s have failed to determine 
communication pattern "
-            + "for an edge from %s to %s", communicationPatternSelector, src, 
dst));
-      }
-      final IREdge edge = new IREdge(communicationPattern, src, dst);
-      final Coder coder;
-      final Coder windowCoder;
-      if (input instanceof PCollection) {
-        coder = ((PCollection) input).getCoder();
-        windowCoder = ((PCollection) 
input).getWindowingStrategy().getWindowFn().windowCoder();
-      } else if (input instanceof PCollectionView) {
-        coder = getCoderForView((PCollectionView) input, root);
-        windowCoder = ((PCollectionView) input).getPCollection()
-          .getWindowingStrategy().getWindowFn().windowCoder();
-      } else {
-        throw new RuntimeException(String.format("While adding an edge from 
%s, to %s, coder for PValue %s cannot "
-          + "be determined", src, dst, input));
-      }
-
-      edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
-
-      if (coder instanceof KvCoder) {
-        Coder keyCoder = ((KvCoder) coder).getKeyCoder();
-        edge.setProperty(KeyEncoderProperty.of(new 
BeamEncoderFactory(keyCoder)));
-        edge.setProperty(KeyDecoderProperty.of(new 
BeamDecoderFactory(keyCoder)));
-      }
-
-      edge.setProperty(EncoderProperty.of(
-        new BeamEncoderFactory<>(WindowedValue.getFullCoder(coder, 
windowCoder))));
-      edge.setProperty(DecoderProperty.of(
-        new BeamDecoderFactory<>(WindowedValue.getFullCoder(coder, 
windowCoder))));
-
-      if (pValueToTag.containsKey(input)) {
-        
edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
-      }
-
-      if (input instanceof PCollectionView) {
-        edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) 
input));
-      }
-
-      builder.connectVertices(edge);
-    }
-
-    /**
-     * Registers a {@link PValue} as a main output from the specified {@link 
IRVertex}.
-     *
-     * @param irVertex the IR vertex
-     * @param output the {@link PValue} {@code irVertex} emits as main output
-     */
-    private void registerMainOutputFrom(final IRVertex irVertex, final PValue 
output) {
-      pValueToProducer.put(output, irVertex);
-    }
-
-    /**
-     * Registers a {@link PValue} as an additional output from the specified 
{@link IRVertex}.
-     *
-     * @param irVertex the IR vertex
-     * @param output the {@link PValue} {@code irVertex} emits as additional 
output
-     * @param tag the {@link TupleTag} associated with this additional output
-     */
-    private void registerAdditionalOutputFrom(final IRVertex irVertex, final 
PValue output, final TupleTag<?> tag) {
-      pValueToTag.put(output, tag);
-      pValueToProducer.put(output, irVertex);
-    }
+  private static Map<TupleTag<?>, Coder<?>> getOutputCoders(final 
AppliedPTransform<?, ?, ?> ptransform) {
+    return ptransform
+      .getOutputs()
+      .entrySet()
+      .stream()
+      .filter(e -> e.getValue() instanceof PCollection)
+      .collect(Collectors.toMap(e -> e.getKey(), e -> ((PCollection) 
e.getValue()).getCoder()));
   }
 
   /**
-   * Default implementation for {@link CommunicationPatternProperty.Value} 
selector.
+   * Create a group by key transform.
+   * It returns GroupByKeyAndWindowDoFnTransform if window function is not 
default.
+   * @param ctx translation context
+   * @param beamNode transform vertex
+   * @return group by key transform
    */
-  private static final class DefaultCommunicationPatternSelector
-      implements BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> {
-
-    private static final DefaultCommunicationPatternSelector INSTANCE = new 
DefaultCommunicationPatternSelector();
-
-    @Override
-    public CommunicationPatternProperty.Value apply(final IRVertex src, final 
IRVertex dst) {
-      final Class<?> constructUnionTableFn;
-      try {
-        constructUnionTableFn = 
Class.forName("org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn");
-      } catch (final ClassNotFoundException e) {
-        throw new RuntimeException(e);
-      }
-
-      final Transform srcTransform = src instanceof OperatorVertex ? 
((OperatorVertex) src).getTransform() : null;
-      final Transform dstTransform = dst instanceof OperatorVertex ? 
((OperatorVertex) dst).getTransform() : null;
-      final DoFn srcDoFn = srcTransform instanceof DoFnTransform ? 
((DoFnTransform) srcTransform).getDoFn() : null;
+  private static Transform createGBKTransform(
+    final PipelineTranslationContext ctx,
+    final TransformHierarchy.Node beamNode) {
+    final AppliedPTransform pTransform = 
beamNode.toAppliedPTransform(ctx.getPipeline());
+    final PCollection<?> mainInput = (PCollection<?>)
+      
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+    final TupleTag mainOutputTag = new TupleTag<>();
 
-      if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn)) 
{
-        return CommunicationPatternProperty.Value.Shuffle;
-      }
-      if (srcTransform instanceof FlattenTransform) {
-        return CommunicationPatternProperty.Value.OneToOne;
-      }
-      if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
-        || dstTransform instanceof GroupByKeyTransform) {
-        return CommunicationPatternProperty.Value.Shuffle;
-      }
-      if (dstTransform instanceof CreateViewTransform) {
-        return CommunicationPatternProperty.Value.BroadCast;
-      }
-      return CommunicationPatternProperty.Value.OneToOne;
+    if (isBatch(beamNode, ctx.getPipeline())) {
+      return new GroupByKeyTransform();
+    } else {
+      return new GroupByKeyAndWindowDoFnTransform(
+        getOutputCoders(pTransform),
+        mainOutputTag,
+        Collections.emptyList(),  /*  GBK does not have additional outputs */
+        mainInput.getWindowingStrategy(),
+        Collections.emptyList(), /*  GBK does not have additional side inputs 
*/
+        ctx.getPipelineOptions(),
+        SystemReduceFn.buffering(mainInput.getCoder()));
     }
   }
 
-  /**
-   * A {@link CommunicationPatternProperty.Value} selector which always emits 
OneToOne.
-   */
-  private static final class OneToOneCommunicationPatternSelector
-      implements BiFunction<IRVertex, IRVertex, 
CommunicationPatternProperty.Value> {
-    private static final OneToOneCommunicationPatternSelector INSTANCE = new 
OneToOneCommunicationPatternSelector();
-
-    @Override
-    public CommunicationPatternProperty.Value apply(final IRVertex src, final 
IRVertex dst) {
-      return CommunicationPatternProperty.Value.OneToOne;
-    }
+  private static boolean isBatch(final TransformHierarchy.Node beamNode, final 
Pipeline pipeline) {
 
 Review comment:
   isBatch -> isGlobalWindow? because batch can also use non-global windows

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to