This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7b33e1cb50b [Java] Improve GroupIntoBatches display data to show
batching parameters (#37865)
7b33e1cb50b is described below
commit 7b33e1cb50b608f486601a850fe078357cdd29e5
Author: Sam Whittle <[email protected]>
AuthorDate: Wed Mar 18 14:16:43 2026 +0000
[Java] Improve GroupIntoBatches display data to show batching parameters
(#37865)
---
.../runners/dataflow/GroupIntoBatchesOverride.java | 12 ++++++++++++
.../apache/beam/sdk/transforms/GroupIntoBatches.java | 18 +++++++++++++++---
2 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
index 8d300fc8bb6..a21df8a900e 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/GroupIntoBatchesOverride.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.construction.PTransformReplacements;
import org.apache.beam.sdk.util.construction.ReplacementOutputs;
@@ -142,6 +143,17 @@ public class GroupIntoBatchesOverride {
c.output(KV.of(c.element().getKey(), currentBatch));
}
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder
builder) {
+ super.populateDisplayData(builder);
+ if (maxBatchSizeElements < Long.MAX_VALUE) {
+ builder.add(DisplayData.item("batchSize",
maxBatchSizeElements));
+ }
+ if (maxBatchSizeBytes < Long.MAX_VALUE) {
+ builder.add(DisplayData.item("batchSizeBytes",
maxBatchSizeBytes));
+ }
+ }
}));
}
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
index 88c63d48de1..773cdea2abc 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -103,9 +104,6 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings({
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"rawtypes",
- // TODO(https://github.com/apache/beam/issues/21230): Remove when new
version of
- // errorprone is released (2.11.0)
- "unused"
})
public class GroupIntoBatches<K, InputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K,
Iterable<InputT>>>> {
@@ -675,5 +673,19 @@ public class GroupIntoBatches<K, InputT>
timerTs.clear();
minBufferedTs.clear();
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ if (batchSize < Long.MAX_VALUE) {
+ builder.add(DisplayData.item("batchSize", batchSize));
+ }
+ if (batchSizeBytes < Long.MAX_VALUE) {
+ builder.add(DisplayData.item("batchSizeBytes", batchSizeBytes));
+ }
+ if (maxBufferingDuration.isLongerThan(Duration.ZERO)) {
+ builder.add(DisplayData.item("maxBufferingDuration",
maxBufferingDuration));
+ }
+ }
}
}