[ 
https://issues.apache.org/jira/browse/BEAM-12378?focusedWorklogId=601262&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-601262
 ]

ASF GitHub Bot logged work on BEAM-12378:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/May/21 16:18
            Start Date: 24/May/21 16:18
    Worklog Time Spent: 10m 
      Work Description: reuvenlax commented on a change in pull request #14852:
URL: https://github.com/apache/beam/pull/14852#discussion_r638092467



##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -101,29 +111,69 @@
    * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
    */
   @AutoValue
-  public abstract static class BatchingParams implements Serializable {
-    public static BatchingParams create(long batchSize, Duration 
maxBufferingDuration) {
-      return new AutoValue_GroupIntoBatches_BatchingParams(batchSize, 
maxBufferingDuration);
+  public abstract static class BatchingParams<InputT> implements Serializable {
+    public static <InputT> BatchingParams<InputT> create(
+        long batchSize,
+        long batchSizeByes,

Review comment:
       done

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
##########
@@ -101,29 +111,69 @@
    * GroupIntoBatches} and {@link GroupIntoBatches.WithShardedKey}.
    */
   @AutoValue
-  public abstract static class BatchingParams implements Serializable {
-    public static BatchingParams create(long batchSize, Duration 
maxBufferingDuration) {
-      return new AutoValue_GroupIntoBatches_BatchingParams(batchSize, 
maxBufferingDuration);
+  public abstract static class BatchingParams<InputT> implements Serializable {

Review comment:
       added

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java
##########
@@ -126,6 +127,99 @@ public Void apply(Iterable<KV<String, Iterable<String>>> 
input) {
     pipeline.run();
   }
 
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesStatefulParDo.class})
+  public void testInGlobalWindowBatchSizeByteSize() {
+    PCollection<KV<String, Iterable<String>>> collection =
+        pipeline
+            .apply("Input data", Create.of(data))
+            .apply(GroupIntoBatches.ofByteSize(BATCH_SIZE_BYTES))
+            // set output coder
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), 
IterableCoder.of(StringUtf8Coder.of())));
+    PAssert.that("Incorrect batch size in one or more elements", collection)
+        .satisfies(
+            new SerializableFunction<Iterable<KV<String, Iterable<String>>>, 
Void>() {
+
+              private boolean checkBatchSizes(Iterable<KV<String, 
Iterable<String>>> listToCheck) {
+                for (KV<String, Iterable<String>> element : listToCheck) {
+                  long byteSize = 0;
+                  for (String str : element.getValue()) {
+                    if (byteSize >= BATCH_SIZE_BYTES) {
+                      // We already reached the batch size, so extra elements 
are not expected.
+                      return false;
+                    }
+                    try {
+                      byteSize += 
StringUtf8Coder.of().getEncodedElementByteSize(str);
+                    } catch (Exception e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                }
+                return true;
+              }
+
+              @Override
+              public Void apply(Iterable<KV<String, Iterable<String>>> input) {
+                assertTrue(checkBatchSizes(input));
+                return null;
+              }
+            });
+    PAssert.thatSingleton("Incorrect collection size", 
collection.apply("Count", Count.globally()))
+        .isEqualTo(3L);
+    pipeline.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesStatefulParDo.class})
+  public void testInGlobalWindowBatchSizeByteSizeFn() {
+    PCollection<KV<String, Iterable<String>>> collection =
+        pipeline
+            .apply("Input data", Create.of(data))
+            .apply(
+                GroupIntoBatches.ofByteSize(
+                    BATCH_SIZE_BYTES,
+                    s -> {
+                      try {
+                        return 
StringUtf8Coder.of().getEncodedElementByteSize(s);
+                      } catch (Exception e) {
+                        throw new RuntimeException(e);
+                      }
+                    }))

Review comment:
       ddone




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 601262)
    Time Spent: 2h 10m  (was: 2h)

> GroupIntoBatches should support byte-size batches
> -------------------------------------------------
>
>                 Key: BEAM-12378
>                 URL: https://issues.apache.org/jira/browse/BEAM-12378
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Priority: P2
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to