kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1525097304


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java:
##########
@@ -290,6 +296,26 @@ public FlinkPortablePipelineTranslator.Executor translate(
     return context;
   }
 
+  private static <K, V> void translateRedistributeByKey(
+      PTransformNode transform, RunnerApi.Pipeline pipeline, 
BatchTranslationContext context) {
+    DataSet<WindowedValue<KV<K, V>>> inputDataSet =
+        context.getDataSetOrThrow(
+            
Iterables.getOnlyElement(transform.getTransform().getInputsMap().values()));
+    context.addDataSet(
+        
Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()),
+        inputDataSet.rebalance());

Review Comment:
   Tried a rewrite. still testing it but I pushed the change up.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -422,6 +428,73 @@ public void translateNode(
     }
   }
 
+  private static class RedistributeByKeyTranslatorBatch<K, InputT>
+      implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+          Redistribute.RedistributeByKey<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Redistribute.RedistributeByKey<K, InputT> transform, 
FlinkBatchTranslationContext context) {
+      final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+      // Construct an instance of CoderTypeInformation which contains the 
pipeline options.
+      // This will be used to initialized FileSystems.
+      final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+          ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) 
inputDataSet.getType())
+              .withPipelineOptions(context.getPipelineOptions());
+      // We insert a NOOP here to initialize the FileSystems via the above 
CoderTypeInformation.
+      // The output type coder may be relying on file system access. The 
shuffled data may have to
+      // be deserialized on a different machine using this coder where 
FileSystems has not been
+      // initialized.
+      final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet =
+          new MapOperator<>(
+              inputDataSet,
+              outputType,
+              FlinkIdentityFunction.of(),
+              getCurrentTransformName(context));
+      final Configuration partitionOptions = new Configuration();
+      partitionOptions.setString(
+          Optimizer.HINT_SHIP_STRATEGY, 
Optimizer.HINT_SHIP_STRATEGY_REPARTITION);

Review Comment:
   Adjusted to match your Reshuffle implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to