reuvenlax commented on a change in pull request #14852:
URL: https://github.com/apache/beam/pull/14852#discussion_r638092467



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -101,29 +111,69 @@
    * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
    */
   @AutoValue
-  public abstract static class BatchingParams implements Serializable {
-    public static BatchingParams create(long batchSize, Duration 
maxBufferingDuration) {
-      return new AutoValue_GroupIntoBatches_BatchingParams(batchSize, 
maxBufferingDuration);
+  public abstract static class BatchingParams<InputT> implements Serializable {
+    public static <InputT> BatchingParams<InputT> create(
+        long batchSize,
+        long batchSizeByes,

Review comment:
       done

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -101,29 +111,69 @@
    * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
    */
   @AutoValue
-  public abstract static class BatchingParams implements Serializable {
-    public static BatchingParams create(long batchSize, Duration 
maxBufferingDuration) {
-      return new AutoValue_GroupIntoBatches_BatchingParams(batchSize, 
maxBufferingDuration);
+  public abstract static class BatchingParams<InputT> implements Serializable {

Review comment:
       added

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
##########
@@ -126,6 +127,99 @@ public Void apply(Iterable<KV<String, Iterable<String>>> 
input) {
     pipeline.run();
   }
 
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesStatefulParDo.class})
+  public void testInGlobalWindowBatchSizeByteSize() {
+    PCollection<KV<String, Iterable<String>>> collection =
+        pipeline
+            .apply("Input data", Create.of(data))
+            .apply(GroupIntoBatches.ofByteSize(BATCH_SIZE_BYTES))
+            // set output coder
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(StringUtf8Coder.of())));
+    PAssert.that("Incorrect batch size in one or more elements", collection)
+        .satisfies(
+            new SerializableFunction<Iterable<KV<String, Iterable<String>>>, 
Void>() {
+
+              private boolean checkBatchSizes(Iterable<KV<String, 
Iterable<String>>> listToCheck) {
+                for (KV<String, Iterable<String>> element : listToCheck) {
+                  long byteSize = 0;
+                  for (String str : element.getValue()) {
+                    if (byteSize >= BATCH_SIZE_BYTES) {
+                      // We already reached the batch size, so extra elements 
are not expected.
+                      return false;
+                    }
+                    try {
+                      byteSize += 
StringUtf8Coder.of().getEncodedElementByteSize(str);
+                    } catch (Exception e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                }
+                return true;
+              }
+
+              @Override
+              public Void apply(Iterable<KV<String, Iterable<String>>> input) {
+                assertTrue(checkBatchSizes(input));
+                return null;
+              }
+            });
+    PAssert.thatSingleton("Incorrect collection size", 
collection.apply("Count", Count.globally()))
+        .isEqualTo(3L);
+    pipeline.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesStatefulParDo.class})
+  public void testInGlobalWindowBatchSizeByteSizeFn() {
+    PCollection<KV<String, Iterable<String>>> collection =
+        pipeline
+            .apply("Input data", Create.of(data))
+            .apply(
+                GroupIntoBatches.ofByteSize(
+                    BATCH_SIZE_BYTES,
+                    s -> {
+                      try {
+                        return 
StringUtf8Coder.of().getEncodedElementByteSize(s);
+                      } catch (Exception e) {
+                        throw new RuntimeException(e);
+                      }
+                    }))

Review comment:
       ddone




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to