nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r513072595



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -105,23 +108,83 @@ public long getBatchSize() {
   }
 
   /**
-   * Set a time limit (in processing time) on how long an incomplete batch of 
elements is allowed to
-   * be buffered. Once a batch is flushed to output, the timer is reset.
+   * Sets a time limit (in processing time) on how long an incomplete batch of 
elements is allowed
+   * to be buffered. Once a batch is flushed to output, the timer is reset.
    */
   public GroupIntoBatches<K, InputT> withMaxBufferingDuration(Duration 
duration) {
     checkArgument(
         duration.isLongerThan(Duration.ZERO), "max buffering duration should 
be a positive value");
     return new GroupIntoBatches<>(batchSize, duration);
   }
 
+  /**
+   * Outputs batched elements associated with sharded input keys. The sharding 
is determined by the
+   * runner to balance the load during the execution time. By default, apply 
no sharding so each key
+   * has one shard.
+   */
+  @Experimental
+  public WithShardedKey withShardedKey() {
+    return new WithShardedKey();
+  }
+
+  public class WithShardedKey

Review comment:
       Currently `WithShardedKey` is only applied with `withShardedKey` option. 
We could alternatively always apply `WithShardedKey` and strip the shard id in 
the no-sharded-output case. Not sure if it's safe to do that since it 
introduces coder changes for all current uses.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to