kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1578127727


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -423,6 +431,76 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, 
FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the 
pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) 
inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above 
CoderTypeInformation.

Review Comment:
   TBH I'd rather separate this investigation to get the Redistribute transform 
working.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/PTransformTranslation.java:
##########
@@ -119,6 +119,9 @@ public class PTransformTranslation {
   public static final String COMBINE_PER_KEY_TRANSFORM_URN = 
"beam:transform:combine_per_key:v1";
   public static final String COMBINE_GLOBALLY_TRANSFORM_URN = 
"beam:transform:combine_globally:v1";
   public static final String RESHUFFLE_URN = "beam:transform:reshuffle:v1";
+  public static final String REDISTRIBUTE_BY_KEY_URN = 
"beam:transform:redistribute_by_key:v1";

Review Comment:
   I'll do a separate change to perhaps migrate these constants to use the 
constants in the proto file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to