robertwb commented on a change in pull request #15665:
URL: https://github.com/apache/beam/pull/15665#discussion_r742406333



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
##########
@@ -36,102 +36,121 @@
 /**
  * <b>For internal use only; no backwards compatibility guarantees.</b>
  *
- * <p>A {@link PTransform} that returns a {@link PCollection} equivalent to 
its input but
+ * <p>{@link PTransform}(s) that returns a {@link PCollection} equivalent to 
its input but
  * operationally provides some of the side effects of a {@link GroupByKey}, in 
particular
  * checkpointing, and preventing fusion of the surrounding transforms.
  *
  * <p>Performs a {@link GroupByKey} so that the data is key-partitioned. 
Configures the {@link
  * WindowingStrategy} so that no data is dropped, but doesn't affect the need 
for the user to
  * specify allowed lateness and accumulation mode before a user-inserted 
GroupByKey.
- *
- * @param <K> The type of key being reshuffled on.
- * @param <V> The type of value being reshuffled.
- * @deprecated this transform's intended side effects are not portable; it 
will likely be removed
  */
 @Internal
-@Deprecated
-public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, 
PCollection<KV<K, V>>> {
+public class Reshuffle {
 
   private Reshuffle() {}
 
-  public static <K, V> Reshuffle<K, V> of() {
-    return new Reshuffle<>();
+  /** @deprecated use {@link #perKey()}. */
+  @Deprecated
+  public static <K, V> PerKey<K, V> of() {
+    return new PerKey<>();
+  }
+
+  public static <K, V> PerKey<K, V> perKey() {
+    return new PerKey<>();
+  }
+
+  /** @deprecated use {@link #perRandomKey()}. */
+  @Deprecated
+  public static <T> PerRandomKey<T> viaRandomKey() {
+    return perRandomKey();
   }
 
   /**
-   * Encapsulates the sequence "pair input with unique key, apply {@link 
Reshuffle#of}, drop the
-   * key" commonly used to break fusion.
+   * Encapsulates the sequence "pair input with unique key, apply {@link 
Reshuffle#perKey()}, drop
+   * the key" commonly used to break fusion.
    */
   @Experimental
-  public static <T> ViaRandomKey<T> viaRandomKey() {
-    return new ViaRandomKey<>();
-  }
-
-  @Override
-  public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
-    WindowingStrategy<?, ?> originalStrategy = input.getWindowingStrategy();
-    // If the input has already had its windows merged, then the GBK that 
performed the merge
-    // will have set originalStrategy.getWindowFn() to InvalidWindows, causing 
the GBK contained
-    // here to fail. Instead, we install a valid WindowFn that leaves all 
windows unchanged.
-    // The TimestampCombiner is set to ensure the GroupByKey does not shift 
elements forwards in
-    // time.
-    // Because this outputs as fast as possible, this should not hold the 
watermark.
-    Window<KV<K, V>> rewindow =
-        Window.<KV<K, V>>into(new 
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
-            .triggering(new ReshuffleTrigger<>())
-            .discardingFiredPanes()
-            .withTimestampCombiner(TimestampCombiner.EARLIEST)
-            
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
-
-    return input
-        .apply(rewindow)
-        .apply("ReifyOriginalTimestamps", Reify.timestampsInValue())
-        .apply(GroupByKey.create())
-        // Set the windowing strategy directly, so that it doesn't get counted 
as the user having
-        // set allowed lateness.
-        .setWindowingStrategyInternal(originalStrategy)
-        .apply(
-            "ExpandIterable",
-            ParDo.of(
-                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, 
TimestampedValue<V>>>() {
-                  @ProcessElement
-                  public void processElement(
-                      @Element KV<K, Iterable<TimestampedValue<V>>> element,
-                      OutputReceiver<KV<K, TimestampedValue<V>>> r) {
-                    K key = element.getKey();
-                    for (TimestampedValue<V> value : element.getValue()) {
-                      r.output(KV.of(key, value));
-                    }
-                  }
-                }))
-        .apply("RestoreOriginalTimestamps", 
ReifyTimestamps.extractFromValues());
+  public static <T> PerRandomKey<T> perRandomKey() {
+    return new PerRandomKey<>();
   }
 
   /** Implementation of {@link #viaRandomKey()}. */
-  public static class ViaRandomKey<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
-    private ViaRandomKey() {}
+  public static class PerRandomKey<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    private PerRandomKey() {}
 
-    private ViaRandomKey(@Nullable Integer numBuckets) {
+    private PerRandomKey(@Nullable Integer numBuckets) {
       this.numBuckets = numBuckets;
     }
 
     // The number of buckets to shard into. This is a performance optimization 
to prevent having
     // unit sized bundles on the output. If unset, uses a random integer key.
     private @Nullable Integer numBuckets;
 
-    public ViaRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
-      return new ViaRandomKey<>(numBuckets);
+    public PerRandomKey<T> withNumBuckets(@Nullable Integer numBuckets) {
+      return new PerRandomKey<>(numBuckets);
     }
 
     @Override
     public PCollection<T> expand(PCollection<T> input) {
       return input
           .apply("Pair with random key", ParDo.of(new 
AssignShardFn<>(numBuckets)))
-          .apply(Reshuffle.of())
+          .apply(Reshuffle.perKey())
           .apply(Values.create());
     }
   }
 
+  /**
+   * Implementation of {@link #perKey*()}.
+   *
+   * @param <K> The type of key being reshuffled on.
+   * @param <V> The type of value being reshuffled.
+   */
+  public static class PerKey<K, V>

Review comment:
       No, we don't need to change this anymore, but we may need to implement 
`getKindString` for PerRandomKey which was renamed. 

##########
File path: model/pipeline/src/main/proto/beam_runner_api.proto
##########
@@ -365,6 +365,12 @@ message StandardPTransforms {
     // Represents the GroupIntoBatches.WithShardedKey operation.
     // Payload: GroupIntoBatchesPayload
     GROUP_INTO_BATCHES_WITH_SHARDED_KEY = 6 [(beam_urn) = 
"beam:transform:group_into_batches_with_sharded_key:v1"];
+
+    // Represents the Reshuffle.perKey() operation.
+    RESHUFFLE_PER_KEY = 7 [(beam_urn) = "beam:transform:reshuffle_per_key:v1"];
+
+    // Represents the Reshuffle.perRandomKey() operation.
+    RESHUFFLE_PER_RANDOM_KEY = 8 [(beam_urn) = 
"beam:transform:reshuffle_per_random_key:v1"];

Review comment:
       Yes. We could rename `Reshuffle.perRandomKey()` to 
`Reshuffle.elements()` as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to