kennknowles commented on code in PR #30545: URL: https://github.com/apache/beam/pull/30545#discussion_r1525097304
########## runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java: ########## @@ -290,6 +296,26 @@ public FlinkPortablePipelineTranslator.Executor translate( return context; } + private static <K, V> void translateRedistributeByKey( + PTransformNode transform, RunnerApi.Pipeline pipeline, BatchTranslationContext context) { + DataSet<WindowedValue<KV<K, V>>> inputDataSet = + context.getDataSetOrThrow( + Iterables.getOnlyElement(transform.getTransform().getInputsMap().values())); + context.addDataSet( + Iterables.getOnlyElement(transform.getTransform().getOutputsMap().values()), + inputDataSet.rebalance()); Review Comment: Tried a rewrite. still testing it but I pushed the change up. ########## runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java: ########## @@ -422,6 +428,73 @@ public void translateNode( } } + private static class RedistributeByKeyTranslatorBatch<K, InputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator< + Redistribute.RedistributeByKey<K, InputT>> { + + @Override + public void translateNode( + Redistribute.RedistributeByKey<K, InputT> transform, FlinkBatchTranslationContext context) { + final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + // Construct an instance of CoderTypeInformation which contains the pipeline options. + // This will be used to initialized FileSystems. + final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType = + ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>) inputDataSet.getType()) + .withPipelineOptions(context.getPipelineOptions()); + // We insert a NOOP here to initialize the FileSystems via the above CoderTypeInformation. + // The output type coder may be relying on file system access. The shuffled data may have to + // be deserialized on a different machine using this coder where FileSystems has not been + // initialized. + final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet = + new MapOperator<>( + inputDataSet, + outputType, + FlinkIdentityFunction.of(), + getCurrentTransformName(context)); + final Configuration partitionOptions = new Configuration(); + partitionOptions.setString( + Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION); Review Comment: Adjusted to match your Reshuffle implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org