This is an automated email from the ASF dual-hosted git repository.
ahmedabu98 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0c9b272c56b Update transform catalogue docs (#38457)
0c9b272c56b is described below
commit 0c9b272c56bd088ad23532466e51e8af32a647af
Author: Ganesh Sivakumar <[email protected]>
AuthorDate: Thu May 21 22:19:44 2026 +0530
Update transform catalogue docs (#38457)
* update transform catalogue
* spotless
* fix for window bug
* Update
website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
* update website sidebar
* update doc
---------
Co-authored-by: Ganeshsivakumar <[email protected]>
Co-authored-by: gemini-code-assist[bot]
<176961590+gemini-code-assist[bot]@users.noreply.github.com>
---
.../apache/beam/examples/BatchElementsExample.java | 81 ++++++++++++++++++++++
.../apache/beam/sdk/transforms/BatchElements.java | 9 ++-
.../transforms/java/aggregation/batchelements.md | 31 +++++++++
.../en/documentation/transforms/java/overview.md | 1 +
.../partials/section-menu/en/documentation.html | 1 +
5 files changed, 122 insertions(+), 1 deletion(-)
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java
b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java
new file mode 100644
index 00000000000..79130d7ac13
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/BatchElementsExample.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples;
+
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.BatchElements;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// beam-playground:
+// name: BatchElements
+// description: Demonstration of BatchElements transform usage.
+// multifile: false
+// default_example: false
+// context_line: 47
+// categories:
+// - Core Transforms
+// complexity: BASIC
+// tags:
+// - transforms
+// - batch
+
+public class BatchElementsExample {
+ public static void main(String[] args) {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ // [START main_section]
+ // Create input
+
+ PCollection<String> inputs =
+ pipeline.apply(Create.of("apple", "strawberry", "orange", "peach",
"cherry", "pear"));
+
+ // Create Batch Config
+ BatchElements.BatchConfig config =
+
BatchElements.BatchConfig.builder().withMinBatchSize(2).withMaxBatchSize(4).build();
+ // Batch Elements
+ PCollection<List<String>> result =
inputs.apply(BatchElements.withConfig(config));
+ // [END main_section]
+ result.apply(ParDo.of(new LogOutput()));
+ pipeline.run();
+ }
+
+ static class LogOutput extends DoFn<List<String>, String> {
+ private static final Logger LOG = LoggerFactory.getLogger(LogOutput.class);
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ List<String> batch = c.element();
+
+ LOG.info("Batch Contents: {}", batch);
+
+ for (String element : batch) {
+ c.output(element);
+ }
+ }
+ }
+}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java
index 35796d1b138..d410c5be600 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/BatchElements.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms;
+import static java.util.Collections.singleton;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
@@ -27,6 +29,7 @@ import java.util.Random;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -568,7 +571,11 @@ public class BatchElements<T> extends
PTransform<PCollection<T>, PCollection<Lis
try (BatchSizeEstimator.Stopwatch sw =
estimator.recordTime(targetBatch.size)) {
- receiver.outputWithTimestamp(targetBatch.elements,
targetWindow.maxTimestamp());
+ receiver.outputWindowedValue(
+ targetBatch.elements,
+ targetWindow.maxTimestamp(),
+ singleton(targetWindow),
+ PaneInfo.NO_FIRING);
}
batches.remove(targetWindow);
diff --git
a/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md
b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md
new file mode 100644
index 00000000000..f842d303c36
--- /dev/null
+++
b/website/www/site/content/en/documentation/transforms/java/aggregation/batchelements.md
@@ -0,0 +1,31 @@
+---
+title: "BatchElements"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# BatchElements
+
+BatchElements transform groups individual elements into batches before
processing them downstream.
+It is designed for operations where each call has a fixed overhead regardless
of how many elements are processed and the transform amortizes that cost across
multiple elements at once.
+The transform takes a `PCollection<T>` as input and produces a
`PCollection<List<T>>`, where each output element is a batch containing
multiple input elements.
+Batch sizes are chosen dynamically between the configured minimum and maximum
values by measuring the execution time of downstream operations.
+
+Batching is performed per window. Each emitted batch belongs to the same
window as its input elements.
+
+## Examples
+
+{{< playground height="700px" >}}
+{{< playground_snippet language="java" path="SDK_JAVA_BatchElements"
show="main_section" >}}
+{{< /playground >}}
diff --git
a/website/www/site/content/en/documentation/transforms/java/overview.md
b/website/www/site/content/en/documentation/transforms/java/overview.md
index 59aa93930fb..adb4da51a83 100644
--- a/website/www/site/content/en/documentation/transforms/java/overview.md
+++ b/website/www/site/content/en/documentation/transforms/java/overview.md
@@ -55,6 +55,7 @@ limitations under the License.
<tr><td><a
href="/documentation/transforms/java/aggregation/groupbykey">GroupByKey</a></td><td>Takes
a keyed collection of elements and produces a collection where each element
consists of a key and all values associated with that key.</td></tr>
<tr><td><a
href="/documentation/transforms/java/aggregation/groupintobatches">GroupIntoBatches</a></td><td>Batches
values associated with keys into <code>Iterable</code> batches of some size.
Each batch contains elements associated with a specific key.</td></tr>
+ <tr><td><a
href="/documentation/transforms/java/aggregation/batchelements">BatchElements</a></td><td>Groups
individual elements into batches to amortize fixed processing costs, using
dynamically estimated batch sizes.</td></tr>
<tr><td><a
href="/documentation/transforms/java/aggregation/hllcount">HllCount</a></td><td>Estimates
the number of distinct elements and creates re-aggregatable sketches using the
HyperLogLog++ algorithm.</td></tr>
<tr><td><a
href="/documentation/transforms/java/aggregation/latest">Latest</a></td><td>Selects
the latest element within each aggregation according to the implicit
timestamp.</td></tr>
<tr><td><a
href="/documentation/transforms/java/aggregation/max">Max</a></td><td>Outputs
the maximum element within each aggregation.</td></tr>
diff --git
a/website/www/site/layouts/partials/section-menu/en/documentation.html
b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 57514935825..41100270d82 100755
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -404,6 +404,7 @@
<li><a
href="/documentation/transforms/java/aggregation/distinct/">Distinct</a></li>
<li><a
href="/documentation/transforms/java/aggregation/groupbykey/">GroupByKey</a></li>
<li><a
href="/documentation/transforms/java/aggregation/groupintobatches/">GroupIntoBatches</a></li>
+ <li><a
href="/documentation/transforms/java/aggregation/batchelements/">BatchElements</a></li>
<li><a
href="/documentation/transforms/java/aggregation/hllcount/">HllCount</a></li>
<li><a
href="/documentation/transforms/java/aggregation/latest/">Latest</a></li>
<li><a
href="/documentation/transforms/java/aggregation/max/">Max</a></li>