wonook closed pull request #23: [NEMO-67] Fix Communication Pattern for Beam 
CoGroupByKey transform
URL: https://github.com/apache/incubator-nemo/pull/23
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
index a649d72f..f61adc67 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/NemoPipelineVisitor.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.compiler.frontend.beam;
 
+import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.compiler.frontend.beam.coder.BeamCoder;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -241,13 +242,29 @@ private static BeamCoder getCoderForView(final ViewFn 
viewFn, final Coder beamIn
    */
   private static DataCommunicationPatternProperty.Value 
getEdgeCommunicationPattern(final IRVertex src,
                                                                                
     final IRVertex dst) {
-    if (dst instanceof OperatorVertex && ((OperatorVertex) dst).getTransform() 
instanceof GroupByKeyTransform) {
+    final Class<?> constructUnionTableFn;
+    try {
+      constructUnionTableFn = 
Class.forName("org.apache.beam.sdk.transforms.join.CoGroupByKey$ConstructUnionTableFn");
+    } catch (final ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+
+    final Transform srcTransform = src instanceof OperatorVertex ? 
((OperatorVertex) src).getTransform() : null;
+    final Transform dstTransform = dst instanceof OperatorVertex ? 
((OperatorVertex) dst).getTransform() : null;
+    final DoFn srcDoFn = srcTransform instanceof DoTransform ? ((DoTransform) 
srcTransform).getDoFn() : null;
+
+    if (srcDoFn != null && srcDoFn.getClass().equals(constructUnionTableFn)) {
       return DataCommunicationPatternProperty.Value.Shuffle;
-    } else if (dst instanceof OperatorVertex && ((OperatorVertex) 
dst).getTransform() instanceof CreateViewTransform
-        || src instanceof OperatorVertex && ((OperatorVertex) 
src).getTransform() instanceof CreateViewTransform) {
-      return DataCommunicationPatternProperty.Value.BroadCast;
-    } else {
+    }
+    if (srcTransform instanceof FlattenTransform) {
       return DataCommunicationPatternProperty.Value.OneToOne;
     }
+    if (dstTransform instanceof GroupByKeyTransform) {
+      return DataCommunicationPatternProperty.Value.Shuffle;
+    }
+    if (srcTransform instanceof CreateViewTransform || dstTransform instanceof 
CreateViewTransform) {
+      return DataCommunicationPatternProperty.Value.BroadCast;
+    }
+    return DataCommunicationPatternProperty.Value.OneToOne;
   }
 }
diff --git 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
index e099253c..883d401e 100644
--- 
a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
+++ 
b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/transform/DoTransform.java
@@ -328,4 +328,11 @@ public Timer timer(final String timerId) {
       throw new UnsupportedOperationException("timer() in ProcessContext under 
DoTransform");
     }
   }
+
+  /**
+   * @return {@link DoFn} for this transform.
+   */
+  public DoFn getDoFn() {
+    return doFn;
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to