je-ik commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r752053222



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
##########
@@ -422,6 +423,38 @@ public void translateNode(
     }
   }
 
+  private static class ReshuffleKeysTranslatorBatch<K, InputT>
+      implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle.Keys<K, 
InputT>> {
+
+    @Override
+    public void translateNode(

Review comment:
       Could we avoid the copy&paste here and wrap the code into a reusable 
utility class instead?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -123,15 +95,88 @@ private ViaRandomKey(@Nullable Integer numBuckets) {
       return new ViaRandomKey<>(numBuckets);
     }
 
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input.apply(new Elements<>(numBuckets));
+    }
+  }
+
+  /** Implementation of {@link #elements()}. */
+  public static class Elements<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private Elements() {}
+
+    private Elements(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    // The number of buckets to shard into. This is a performance optimization 
to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets;
+
+    public Elements<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new Elements<>(numBuckets);
+    }
+
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply("Pair with random key", ParDo.of(new 
AssignShardFn<>(numBuckets)))
-          .apply(Reshuffle.of())
+          .apply(Reshuffle.keys())
           .apply(Values.create());
     }
   }
 
+  /**
+   * Implementation of {@link #keys *()}.
+   *
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class Keys<K, V> extends PTransform<PCollection<KV<K, V>>, 
PCollection<KV<K, V>>> {

Review comment:
       I'm not 100% sure, but this it seems to me, that we still need to 
override `getKindString` to preserve the original state names. Maybe @robertwb 
can correct me, if I'm wrong.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
##########
@@ -728,6 +728,36 @@ public String toNativeString() {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> 
TransformEvaluator<Reshuffle.Keys<K, V>> reshuffleKeys() {

Review comment:
       Same here, this looks like a copy&paste as well.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
##########
@@ -530,6 +530,38 @@ public String toNativeString() {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> 
TransformEvaluator<Reshuffle.Keys<K, V>> reshuffleKeys() {

Review comment:
       As well as here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to