robertwb commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1581366814


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java:
##########
@@ -917,6 +919,41 @@ private <K1, K2, V> void groupByKeyAndSortValuesHelper(
           }
         });
 
+    registerTransformTranslator(
+        RedistributeByKey.class,

Review Comment:
   I guess I was thinking in terms of portable runners. Here we have to have a 
special translator to set the `PropertyNames.ALLOW_DUPLICATES` property, right?



##########
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java:
##########
@@ -788,6 +852,8 @@ public String toNativeString() {
     EVALUATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, 
createPCollView());
     EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, 
window());
     EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
+    EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, 
redistributeArbitrarily());

Review Comment:
   Same comment as for Flink (and below).



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), 
inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext 
context) {
+    RunnerApi.PTransform transform = 
pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        
context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), 
inputDataStream.rebalance());

Review Comment:
   Looks identical to the code above as well. I suppose my preference is to not 
duplicate code unless there's a god reason for divergence or it's difficult to 
do so. E.g. above why not just write
   
   ```
   translatorMap.put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, 
this::translateReshuffle);
   translatorMap.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, 
this::translateReshuffle);
   ```
   
   which makes it clear that these are all implemented the same rather than 
trying to pattern match and see what the differences, if any, are.
   
   It's not logical necessity, but saying "this (same) implementation is good 
enough for this usecase as well."



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -301,6 +305,24 @@ private <K, V> void translateReshuffle(
         Iterables.getOnlyElement(transform.getOutputsMap().values()), 
inputDataStream.rebalance());
   }
 
+  private <K, V> void translateRedistributeByKey(
+      String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext 
context) {
+    RunnerApi.PTransform transform = 
pipeline.getComponents().getTransformsOrThrow(id);
+    DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+        
context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+    context.addDataStream(
+        Iterables.getOnlyElement(transform.getOutputsMap().values()), 
inputDataStream.rebalance());

Review Comment:
   But if you feel strongly, no need for this to be a blocker.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java:
##########
@@ -162,6 +162,11 @@ class FlinkStreamingTransformTranslators {
         new CreateViewStreamingTranslator());
 
     TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new 
ReshuffleTranslatorStreaming());
+    TRANSLATORS.put(

Review Comment:
   Same exact comment here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to