nehsyc commented on a change in pull request #13208:
URL: https://github.com/apache/beam/pull/13208#discussion_r513072595
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -105,23 +108,83 @@ public long getBatchSize() {
}
/**
- * Set a time limit (in processing time) on how long an incomplete batch of
elements is allowed to
- * be buffered. Once a batch is flushed to output, the timer is reset.
+ * Sets a time limit (in processing time) on how long an incomplete batch of
elements is allowed
+ * to be buffered. Once a batch is flushed to output, the timer is reset.
*/
public GroupIntoBatches<K, InputT> withMaxBufferingDuration(Duration
duration) {
checkArgument(
duration.isLongerThan(Duration.ZERO), "max buffering duration should
be a positive value");
return new GroupIntoBatches<>(batchSize, duration);
}
+ /**
+ * Outputs batched elements associated with sharded input keys. The sharding
is determined by the
+ * runner to balance the load during the execution time. By default, apply
no sharding so each key
+ * has one shard.
+ */
+ @Experimental
+ public WithShardedKey withShardedKey() {
+ return new WithShardedKey();
+ }
+
+ public class WithShardedKey
Review comment:
Currently `WithShardedKey` is only applied with `withShardedKey` option.
We could alternatively always apply `WithShardedKey` and strip the shard id in
the no-sharded-output case. Not sure if it's safe to do that since it
introduces coder changes for all current uses.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]