Repository: incubator-beam
Updated Branches:
  refs/heads/master 56c52773e -> 9adbecb6a


Use native Flink rebalance for Reshuffle Transform


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b58a19ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b58a19ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b58a19ce

Branch: refs/heads/master
Commit: b58a19cebebddbfb5bc8ebe9cc2766c5eccbda59
Parents: 56c5277
Author: Aljoscha Krettek <[email protected]>
Authored: Fri Jun 17 15:16:19 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Jul 4 16:27:51 2016 +0200

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java          | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b58a19ce/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index ac058b2..0bba0d0 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -52,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -102,6 +103,7 @@ class FlinkBatchTransformTranslators {
 
     TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslatorBatch());
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
+    TRANSLATORS.put(Reshuffle.class, new ReshuffleTranslatorBatch());
 
     TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslatorBatch());
 
@@ -283,6 +285,23 @@ class FlinkBatchTransformTranslators {
 
   }
 
+  private static class ReshuffleTranslatorBatch<K, InputT>
+      implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> {
+
+    @Override
+    public void translateNode(
+        Reshuffle<K, InputT> transform,
+        FlinkBatchTranslationContext context) {
+
+      DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
+          context.getInputDataSet(context.getInput(transform));
+
+      context.setOutputDataSet(context.getOutput(transform), 
inputDataSet.rebalance());
+
+    }
+
+  }
+
   /**
    * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
    *

Reply via email to