mxm commented on a change in pull request #11874:
URL: https://github.com/apache/beam/pull/11874#discussion_r433879615
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
##########
@@ -105,40 +113,45 @@
//
--------------------------------------------------------------------------------------------
@SuppressWarnings("rawtypes")
- private static final Map<String,
FlinkBatchPipelineTranslator.BatchTransformTranslator>
- TRANSLATORS = new HashMap<>();
+ private static final Multimap<String,
FlinkBatchPipelineTranslator.BatchTransformTranslator>
+ TRANSLATORS = MultimapBuilder.hashKeys().arrayListValues().build();
static {
TRANSLATORS.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new
ImpulseTranslatorBatch());
-
TRANSLATORS.put(
PTransformTranslation.CREATE_VIEW_TRANSFORM_URN,
- new CreatePCollectionViewTranslatorBatch());
-
+ new CreatePCollectionViewTranslatorBatch<>());
TRANSLATORS.put(
- PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new
CombinePerKeyTranslatorBatch());
+ PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new
CombinePerKeyTranslatorBatch<>());
TRANSLATORS.put(
- PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new
GroupByKeyTranslatorBatch());
- TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new
ReshuffleTranslatorBatch());
-
+ PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN,
+ new NonMergingGroupByKeyTranslatorBatch<>());
TRANSLATORS.put(
- PTransformTranslation.FLATTEN_TRANSFORM_URN, new
FlattenPCollectionTranslatorBatch());
-
+ PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new
GroupByKeyTranslatorBatch<>());
+ TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new
ReshuffleTranslatorBatch<>());
TRANSLATORS.put(
- PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new
WindowAssignTranslatorBatch());
-
- TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new
ParDoTranslatorBatch());
-
- TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new
ReadSourceTranslatorBatch());
Review comment:
Could we keep the original order of the translators here?
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
##########
@@ -68,13 +71,18 @@ public FlinkBatchTranslationContext(ExecutionEnvironment
env, PipelineOptions op
this.danglingDataSets = new HashMap<>();
}
+ void init(Pipeline pipeline) {
+ pipeline.traverseTopologically(countingPipelineVisitor);
+ pipeline.traverseTopologically(lookupPipelineVisitor);
+ }
+
// ------------------------------------------------------------------------
- public Map<PValue, DataSet<?>> getDanglingDataSets() {
+ Map<PValue, DataSet<?>> getDanglingDataSets() {
Review comment:
Didn't see that this was the case. It's still an unrelated change and it
draws attention away from the actual issue.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]