[ 
https://issues.apache.org/jira/browse/BEAM-12999?focusedWorklogId=683180&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-683180
 ]

ASF GitHub Bot logged work on BEAM-12999:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Nov/21 09:35
            Start Date: 18/Nov/21 09:35
    Worklog Time Spent: 10m 
      Work Description: je-ik commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r752053222



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java
##########
@@ -422,6 +423,38 @@ public void translateNode(
     }
   }
 
+  private static class ReshuffleKeysTranslatorBatch<K, InputT>
+      implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle.Keys<K, 
InputT>> {
+
+    @Override
+    public void translateNode(

Review comment:
       Could we avoid the copy&paste here and wrap the code into a reusable 
utility class instead?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -123,15 +95,88 @@ private ViaRandomKey(@Nullable Integer numBuckets) {
       return new ViaRandomKey<>(numBuckets);
     }
 
+    @Override
+    public PCollection<T> expand(PCollection<T> input) {
+      return input.apply(new Elements<>(numBuckets));
+    }
+  }
+
+  /** Implementation of {@link #elements()}. */
+  public static class Elements<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private Elements() {}
+
+    private Elements(@Nullable Integer numBuckets) {
+      this.numBuckets = numBuckets;
+    }
+
+    // The number of buckets to shard into. This is a performance optimization 
to prevent having
+    // unit sized bundles on the output. If unset, uses a random integer key.
+    private @Nullable Integer numBuckets;
+
+    public Elements<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new Elements<>(numBuckets);
+    }
+
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply("Pair with random key", ParDo.of(new 
AssignShardFn<>(numBuckets)))
-          .apply(Reshuffle.of())
+          .apply(Reshuffle.keys())
           .apply(Values.create());
     }
   }
 
+  /**
+   * Implementation of {@link #keys *()}.
+   *
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class Keys<K, V> extends PTransform<PCollection<KV<K, V>>, 
PCollection<KV<K, V>>> {

Review comment:
       I'm not 100% sure, but this it seems to me, that we still need to 
override `getKindString` to preserve the original state names. Maybe @robertwb 
can correct me, if I'm wrong.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
##########
@@ -728,6 +728,36 @@ public String toNativeString() {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> 
TransformEvaluator<Reshuffle.Keys<K, V>> reshuffleKeys() {

Review comment:
       Same here, this looks like a copy&paste as well.

##########
File path: 
runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
##########
@@ -530,6 +530,38 @@ public String toNativeString() {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> 
TransformEvaluator<Reshuffle.Keys<K, V>> reshuffleKeys() {

Review comment:
       As well as here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 683180)
    Time Spent: 18h 10m  (was: 18h)

> Improve Reshuffle Transform
> ---------------------------
>
>                 Key: BEAM-12999
>                 URL: https://issues.apache.org/jira/browse/BEAM-12999
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go, sdk-java-core, sdk-py-core
>            Reporter: Ke Wu
>            Assignee: Ke Wu
>            Priority: P2
>          Time Spent: 18h 10m
>  Remaining Estimate: 0h
>
> See discussion 
> [https://lists.apache.org/thread.html/r83adaad3a512ad186f2f9dc9dc4bec2a789070677c07cdcaad6fcfa5%40%3Cdev.beam.apache.org%3E]
>  
>  
> “beam:transform:reshuffle:v1" Transform represents different semantic 
> transforms in different SDKs. The proposal is to replace 
> "beam:transform:reshuffle:v1" with two new urns, one to represent reshuffle 
> KV PCollection using the K, and the other to reshuffle based on random key.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to