taegeonum commented on a change in pull request #159: [NEMO-216,251,259] 
Support side inputs and windowing
URL: https://github.com/apache/incubator-nemo/pull/159#discussion_r234858450
 
 

 ##########
 File path: 
compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslationContext.java
 ##########
 @@ -91,69 +92,83 @@ void addVertex(final IRVertex vertex) {
     builder.addVertex(vertex, loopVertexStack);
   }
 
+  /**
+   * Say the dstIRVertex consumes three views: view0, view1, and view2.
+   *
+   * We translate that as the following:
+   * view0 -> SideInputTransform(index=0) ->
+   * view1 -> SideInputTransform(index=1) -> dstIRVertex(with a map from 
indices to PCollectionViews)
+   * view2 -> SideInputTransform(index=2) ->
+   *
+   * @param dstVertex vertex.
+   * @param sideInputs of the vertex.
+   */
+  void addSideInputEdges(final IRVertex dstVertex, final Map<Integer, 
PCollectionView<?>> sideInputs) {
+    for (final Map.Entry<Integer, PCollectionView<?>> entry : 
sideInputs.entrySet()) {
+      final int index = entry.getKey();
+      final PCollectionView view = entry.getValue();
+
+      final IRVertex srcVertex = pValueToProducerVertex.get(view);
+      final IRVertex sideInputTransformVertex = new OperatorVertex(new 
SideInputTransform(index));
+      addVertex(sideInputTransformVertex);
+      final Coder viewCoder = getCoderForView(view, this);
+      final Coder windowCoder = 
view.getPCollection().getWindowingStrategy().getWindowFn().windowCoder();
+
+      // First edge: view to transform
+      final IREdge firstEdge =
+        new IREdge(CommunicationPatternProperty.Value.OneToOne, srcVertex, 
sideInputTransformVertex);
+      addEdge(firstEdge, viewCoder, windowCoder);
+
+      // Second edge: transform to the dstIRVertex
+      final IREdge secondEdge =
+        new IREdge(CommunicationPatternProperty.Value.OneToOne, 
sideInputTransformVertex, dstVertex);
+      final Coder sideInputElementCoder = 
SideInputCoder.of(WindowedValue.getFullCoder(viewCoder, windowCoder));
+      secondEdge.setProperty(EncoderProperty.of(new 
BeamEncoderFactory(sideInputElementCoder)));
+      secondEdge.setProperty(DecoderProperty.of(new 
BeamDecoderFactory(sideInputElementCoder)));
+      builder.connectVertices(secondEdge);
+    }
+  }
+
   /**
    * Add IR edge to the builder.
    *
    * @param dst the destination IR vertex.
    * @param input the {@link PValue} {@code dst} consumes
    */
   void addEdgeTo(final IRVertex dst, final PValue input) {
-    final Coder coder;
     if (input instanceof PCollection) {
-      coder = ((PCollection) input).getCoder();
-    } else if (input instanceof PCollectionView) {
-      coder = getCoderForView((PCollectionView) input, this);
-    } else {
-      throw new RuntimeException(String.format("While adding an edge to %s, 
coder for PValue %s cannot "
-        + "be determined", dst, input));
-    }
-    addEdgeTo(dst, input, coder);
-  }
+      final Coder elementCoder = ((PCollection) input).getCoder();
+      final Coder windowCoder = ((PCollection) 
input).getWindowingStrategy().getWindowFn().windowCoder();
+      final IRVertex src = pValueToProducerVertex.get(input);
+      if (src == null) {
+        throw new IllegalStateException(String.format("Cannot find a vertex 
that emits pValue %s", input));
+      }
 
-  void addEdgeTo(final IRVertex dst, final PValue input, final Coder 
elementCoder) {
-    final IRVertex src = pValueToProducerVertex.get(input);
-    if (src == null) {
-      throw new IllegalStateException(String.format("Cannot find a vertex that 
emits pValue %s", input));
-    }
+      final CommunicationPatternProperty.Value communicationPattern = 
getCommPattern(src, dst);
+      final IREdge edge = new IREdge(communicationPattern, src, dst);
 
-    final Coder windowCoder;
-    final CommunicationPatternProperty.Value communicationPattern = 
getCommPattern(src, dst);
-    final IREdge edge = new IREdge(communicationPattern, src, dst);
+      if (pValueToTag.containsKey(input)) {
+        
edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
+      }
 
-    if (pValueToTag.containsKey(input)) {
-      
edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
-    }
-    if (input instanceof PCollectionView) {
-      edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) 
input));
-    }
-    if (input instanceof PCollection) {
-      windowCoder = ((PCollection) 
input).getWindowingStrategy().getWindowFn().windowCoder();
-    } else if (input instanceof PCollectionView) {
-      windowCoder = ((PCollectionView) input).getPCollection()
-        .getWindowingStrategy().getWindowFn().windowCoder();
+      addEdge(edge, elementCoder, windowCoder);
     } else {
-      throw new RuntimeException(String.format("While adding an edge from %s, 
to %s, coder for PValue %s cannot "
-        + "be determined", src, dst, input));
+      throw new IllegalStateException(input.toString());
     }
-
-    addEdgeTo(edge, elementCoder, windowCoder);
   }
 
-  void addEdgeTo(final IREdge edge,
-                 final Coder elementCoder,
-                 final Coder windowCoder) {
+  void addEdge(final IREdge edge, final Coder elementCoder, final Coder 
windowCoder) {
+    // TODO key extractor only when many to many
 
 Review comment:
   please add jira issue number

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to