nehsyc commented on a change in pull request #14852:
URL: https://github.com/apache/beam/pull/14852#discussion_r637620464



##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
##########
@@ -126,6 +127,99 @@ public Void apply(Iterable<KV<String, Iterable<String>>> 
input) {
     pipeline.run();
   }
 
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesStatefulParDo.class})
+  public void testInGlobalWindowBatchSizeByteSize() {
+    PCollection<KV<String, Iterable<String>>> collection =
+        pipeline
+            .apply("Input data", Create.of(data))
+            .apply(GroupIntoBatches.ofByteSize(BATCH_SIZE_BYTES))
+            // set output coder
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(StringUtf8Coder.of())));
+    PAssert.that("Incorrect batch size in one or more elements", collection)
+        .satisfies(
+            new SerializableFunction<Iterable<KV<String, Iterable<String>>>, 
Void>() {
+
+              private boolean checkBatchSizes(Iterable<KV<String, 
Iterable<String>>> listToCheck) {
+                for (KV<String, Iterable<String>> element : listToCheck) {
+                  long byteSize = 0;
+                  for (String str : element.getValue()) {
+                    if (byteSize >= BATCH_SIZE_BYTES) {
+                      // We already reached the batch size, so extra elements 
are not expected.
+                      return false;
+                    }
+                    try {
+                      byteSize += 
StringUtf8Coder.of().getEncodedElementByteSize(str);
+                    } catch (Exception e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                }
+                return true;
+              }
+
+              @Override
+              public Void apply(Iterable<KV<String, Iterable<String>>> input) {
+                assertTrue(checkBatchSizes(input));
+                return null;
+              }
+            });
+    PAssert.thatSingleton("Incorrect collection size", 
collection.apply("Count", Count.globally()))
+        .isEqualTo(3L);
+    pipeline.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesStatefulParDo.class})
+  public void testInGlobalWindowBatchSizeByteSizeFn() {
+    PCollection<KV<String, Iterable<String>>> collection =
+        pipeline
+            .apply("Input data", Create.of(data))
+            .apply(
+                GroupIntoBatches.ofByteSize(
+                    BATCH_SIZE_BYTES,
+                    s -> {
+                      try {
+                        return 
StringUtf8Coder.of().getEncodedElementByteSize(s);
+                      } catch (Exception e) {
+                        throw new RuntimeException(e);
+                      }
+                    }))

Review comment:
       Maybe use a different weigher than the default?

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -101,29 +111,69 @@
    * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
    */
   @AutoValue
-  public abstract static class BatchingParams implements Serializable {
-    public static BatchingParams create(long batchSize, Duration 
maxBufferingDuration) {
-      return new AutoValue_GroupIntoBatches_BatchingParams(batchSize, 
maxBufferingDuration);
+  public abstract static class BatchingParams<InputT> implements Serializable {
+    public static <InputT> BatchingParams<InputT> create(
+        long batchSize,
+        long batchSizeByes,

Review comment:
       typo: batchSizeBytes

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -101,29 +111,69 @@
    * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
    */
   @AutoValue
-  public abstract static class BatchingParams implements Serializable {
-    public static BatchingParams create(long batchSize, Duration 
maxBufferingDuration) {
-      return new AutoValue_GroupIntoBatches_BatchingParams(batchSize, 
maxBufferingDuration);
+  public abstract static class BatchingParams<InputT> implements Serializable {

Review comment:
       Dataflow also overrides the implementation for batch here: 
https://github.com/apache/beam/blob/06dbe4f59592efcf0c68c943281987819588ce87/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java#L71
   
   That doesn't use stateful dofn but instead uses `Iterators.partition` to 
partition the input. Not sure how byteSize fits there.

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -101,29 +111,69 @@
    * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
    */
   @AutoValue
-  public abstract static class BatchingParams implements Serializable {
-    public static BatchingParams create(long batchSize, Duration 
maxBufferingDuration) {
-      return new AutoValue_GroupIntoBatches_BatchingParams(batchSize, 
maxBufferingDuration);
+  public abstract static class BatchingParams<InputT> implements Serializable {

Review comment:
       Runner API probably needs to be updated accordingly: 
https://github.com/apache/beam/blob/06dbe4f59592efcf0c68c943281987819588ce87/model/pipeline/src/main/proto/beam_runner_api.proto#L755
   
   Translation happens here:
   
https://github.com/apache/beam/blob/06dbe4f59592efcf0c68c943281987819588ce87/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/GroupIntoBatchesTranslation.java#L34




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to