Repository: incubator-beam Updated Branches: refs/heads/master 56c52773e -> 9adbecb6a
Use native Flink rebalance for Reshuffle Transform Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b58a19ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b58a19ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b58a19ce Branch: refs/heads/master Commit: b58a19cebebddbfb5bc8ebe9cc2766c5eccbda59 Parents: 56c5277 Author: Aljoscha Krettek <[email protected]> Authored: Fri Jun 17 15:16:19 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Jul 4 16:27:51 2016 +0200 ---------------------------------------------------------------------- .../FlinkBatchTransformTranslators.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b58a19ce/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index ac058b2..0bba0d0 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; @@ -102,6 +103,7 @@ class FlinkBatchTransformTranslators { TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch()); TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch()); + TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch()); TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch()); @@ -283,6 +285,23 @@ class FlinkBatchTransformTranslators { } + private static class ReshuffleTranslatorBatch<K, InputT> + implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> { + + @Override + public void translateNode( + Reshuffle<K, InputT> transform, + FlinkBatchTranslationContext context) { + + DataSet<WindowedValue<KV<K, InputT>>> inputDataSet = + context.getInputDataSet(context.getInput(transform)); + + context.setOutputDataSet(context.getOutput(transform), inputDataSet.rebalance()); + + } + + } + /** * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. *
