kennknowles commented on code in PR #30545:
URL: https://github.com/apache/beam/pull/30545#discussion_r1523440543
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java:
##########
@@ -422,6 +428,73 @@ public void translateNode(
}
}
+ private static class RedistributeByKeyTranslatorBatch<K, InputT>
+ implements FlinkBatchPipelineTranslator.BatchTransformTranslator<
+ Redistribute.RedistributeByKey<K, InputT>> {
+
+ @Override
+ public void translateNode(
+ Redistribute.RedistributeByKey<K, InputT> transform,
FlinkBatchTranslationContext context) {
+ final DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+ context.getInputDataSet(context.getInput(transform));
+ // Construct an instance of CoderTypeInformation which contains the
pipeline options.
+ // This will be used to initialized FileSystems.
+ final CoderTypeInformation<WindowedValue<KV<K, InputT>>> outputType =
+ ((CoderTypeInformation<WindowedValue<KV<K, InputT>>>)
inputDataSet.getType())
+ .withPipelineOptions(context.getPipelineOptions());
+ // We insert a NOOP here to initialize the FileSystems via the above
CoderTypeInformation.
+ // The output type coder may be relying on file system access. The
shuffled data may have to
+ // be deserialized on a different machine using this coder where
FileSystems has not been
+ // initialized.
+ final DataSet<WindowedValue<KV<K, InputT>>> retypedDataSet =
+ new MapOperator<>(
+ inputDataSet,
+ outputType,
+ FlinkIdentityFunction.of(),
+ getCurrentTransformName(context));
+ final Configuration partitionOptions = new Configuration();
+ partitionOptions.setString(
+ Optimizer.HINT_SHIP_STRATEGY,
Optimizer.HINT_SHIP_STRATEGY_REPARTITION);
Review Comment:
Just re-iterating that this is a copy-paste-modify of the Reshuffle logic. I
didn't have them just share logic because I only like to do that when it is by
logical necessity whereas these two transforms may at some point diverge. (but
it would be fine to make them share for now)
##########
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java:
##########
@@ -283,6 +287,24 @@ private <K, V> void translateReshuffle(
Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataStream.rebalance());
}
+ private <K, V> void translateRedistributeByKey(
+ String id, RunnerApi.Pipeline pipeline, StreamingTranslationContext
context) {
+ RunnerApi.PTransform transform =
pipeline.getComponents().getTransformsOrThrow(id);
+ DataStream<WindowedValue<KV<K, V>>> inputDataStream =
+
context.getDataStreamOrThrow(Iterables.getOnlyElement(transform.getInputsMap().values()));
+ context.addDataStream(
+ Iterables.getOnlyElement(transform.getOutputsMap().values()),
inputDataStream.rebalance());
Review Comment:
Would `keyBy` cause the `rebalance()` to use those keys when rebalancing?
The current `Reshuffle` doesn't seem to do that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]