mxm commented on a change in pull request #11874:
URL: https://github.com/apache/beam/pull/11874#discussion_r433879615



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
##########
@@ -105,40 +113,45 @@
   // 
--------------------------------------------------------------------------------------------
 
   @SuppressWarnings("rawtypes")
-  private static final Map<String, 
FlinkBatchPipelineTranslator.BatchTransformTranslator>
-      TRANSLATORS = new HashMap<>();
+  private static final Multimap<String, 
FlinkBatchPipelineTranslator.BatchTransformTranslator>
+      TRANSLATORS = MultimapBuilder.hashKeys().arrayListValues().build();
 
   static {
     TRANSLATORS.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new 
ImpulseTranslatorBatch());
-
     TRANSLATORS.put(
         PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
-        new CreatePCollectionViewTranslatorBatch());
-
+        new CreatePCollectionViewTranslatorBatch<>());
     TRANSLATORS.put(
-        PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new 
CombinePerKeyTranslatorBatch());
+        PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new 
CombinePerKeyTranslatorBatch<>());
     TRANSLATORS.put(
-        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new 
GroupByKeyTranslatorBatch());
-    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new 
ReshuffleTranslatorBatch());
-
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+        new NonMergingGroupByKeyTranslatorBatch<>());
     TRANSLATORS.put(
-        PTransformTranslation.FLATTEN_TRANSFORM_URN, new 
FlattenPCollectionTranslatorBatch());
-
+        PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new 
GroupByKeyTranslatorBatch<>());
+    TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new 
ReshuffleTranslatorBatch<>());
     TRANSLATORS.put(
-        PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new 
WindowAssignTranslatorBatch());
-
-    TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new 
ParDoTranslatorBatch());
-
-    TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new 
ReadSourceTranslatorBatch());

Review comment:
       Could we keep the original order of the translators here?

##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
##########
@@ -68,13 +71,18 @@ public FlinkBatchTranslationContext(ExecutionEnvironment 
env, PipelineOptions op
     this.danglingDataSets = new HashMap<>();
   }
 
+  void init(Pipeline pipeline) {
+    pipeline.traverseTopologically(countingPipelineVisitor);
+    pipeline.traverseTopologically(lookupPipelineVisitor);
+  }
+
   // ------------------------------------------------------------------------
 
-  public Map<PValue, DataSet<?>> getDanglingDataSets() {
+  Map<PValue, DataSet<?>> getDanglingDataSets() {

Review comment:
       Didn't see that this was the case. It's still an unrelated change and it 
draws attention away from the actual issue.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to