gemini-code-assist[bot] commented on code in PR #38230:
URL: https://github.com/apache/beam/pull/38230#discussion_r3183822825
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -107,6 +108,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
* <li>It uses discarding or accumulation mode according to the {@link
WindowingStrategy}.
* </ul>
*/
+ static final StateTag<BagState<CombinedMetadata>> METADATA_TAG =
+ StateTags.makeSystemTagInternal(
+ StateTags.bag("combinedMetadata", CombinedMetadata.Coder.of()));
Review Comment:

Using `BagState` for metadata that is always combined into a single value is
inefficient. It leads to O(N) state growth (where N is the number of elements)
and O(N) read/merge time during firing. Since the metadata (like
`causedByDrain`) follows simple combination rules (e.g., OR logic), it is much
more efficient to use a `CombiningState`. This allows the runner to store only
the aggregated value and potentially combine updates eagerly, significantly
reducing I/O and memory overhead for high-volume windows.
```suggestion
static final StateTag<CombiningState<CombinedMetadata, CombinedMetadata,
CombinedMetadata>> METADATA_TAG =
StateTags.makeSystemTagInternal(
StateTags.combiningValue(
"combinedMetadata", CombinedMetadata.Coder.of(),
CombinedMetadataCombiner.of()));
```
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -1009,8 +1025,21 @@ private void prefetchOnTrigger(
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
final boolean isFinished,
boolean isEndOfWindow,
- CausedByDrain causedByDrain)
+ CombinedMetadata metadata)
throws Exception {
+ BagState<CombinedMetadata> metadataState =
directContext.state().access(METADATA_TAG);
+ Iterable<CombinedMetadata> allMetadata = metadataState.read();
+ CombinedMetadata aggregatedMetadata =
+ CombinedMetadataCombiner.of().mergeAccumulators(allMetadata);
Review Comment:

Update this to use the `CombiningState` API. This simplifies the logic as
the state already provides the aggregated value directly, avoiding the need to
manually call `mergeAccumulators` on an iterable.
```suggestion
CombiningState<CombinedMetadata, CombinedMetadata, CombinedMetadata>
metadataState =
directContext.state().access(METADATA_TAG);
CombinedMetadata aggregatedMetadata = metadataState.read();
```
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.values.CausedByDrain;
+
+/**
+ * Encapsulates metadata that propagates with elements in the pipeline.
+ *
+ * <p>This metadata is sent along with elements. It currently includes fields
like {@link
+ * CausedByDrain}, and is designed to be extensible to support future metadata
fields such as
+ * OpenTelemetry context or CDC (Change Data Capture) kind.
+ *
+ * <p>The purpose of this class is to group targeted metadata fields together.
This makes it easier
+ * to define combination strategies (e.g., when accumulating state in {@code
ReduceFnRunner}) when
+ * multiple elements are merged or grouped, without having to extend method
signatures or state
+ * handling for every new metadata field.
+ */
+@AutoValue
+public abstract class CombinedMetadata {
+ public abstract CausedByDrain causedByDrain();
+
+ public static CombinedMetadata create(CausedByDrain causedByDrain) {
+ return new AutoValue_CombinedMetadata(causedByDrain);
+ }
+
+ public static CombinedMetadata createDefault() {
+ return create(CausedByDrain.NORMAL);
+ }
+
+ public static class Coder extends AtomicCoder<CombinedMetadata> {
+ private static final Coder INSTANCE = new Coder();
+
+ public static Coder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(CombinedMetadata value, OutputStream outStream) throws
IOException {
+ if (value == null) {
+ NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream);
+ return;
+ }
+ BeamFnApi.Elements.ElementMetadata.Builder builder =
+ BeamFnApi.Elements.ElementMetadata.newBuilder();
+ builder.setDrain(
+ value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
+ ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
+
+
NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(),
outStream);
+ }
+
+ @Override
+ public CombinedMetadata decode(InputStream inStream) throws IOException {
+ byte[] bytes = NullableCoder.of(ByteArrayCoder.of()).decode(inStream);
+ if (bytes == null) {
+ return CombinedMetadata.createDefault();
+ }
+ BeamFnApi.Elements.ElementMetadata proto =
+ BeamFnApi.Elements.ElementMetadata.parseFrom(bytes);
+
+ CausedByDrain causedByDrain =
+ proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ ? CausedByDrain.CAUSED_BY_DRAIN
+ : CausedByDrain.NORMAL;
+
+ return CombinedMetadata.create(causedByDrain);
+ }
Review Comment:

Simplify the decoding logic to match the delimited proto encoding.
`parseDelimitedFrom` returns `null` if the stream is at EOF, which can be used
to return the default metadata.
```suggestion
@Override
public CombinedMetadata decode(InputStream inStream) throws IOException {
BeamFnApi.Elements.ElementMetadata proto =
BeamFnApi.Elements.ElementMetadata.parseDelimitedFrom(inStream);
if (proto == null) {
return CombinedMetadata.createDefault();
}
CausedByDrain causedByDrain =
proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
? CausedByDrain.CAUSED_BY_DRAIN
: CausedByDrain.NORMAL;
return CombinedMetadata.create(causedByDrain);
}
```
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.values.CausedByDrain;
+
+/**
+ * Encapsulates metadata that propagates with elements in the pipeline.
+ *
+ * <p>This metadata is sent along with elements. It currently includes fields
like {@link
+ * CausedByDrain}, and is designed to be extensible to support future metadata
fields such as
+ * OpenTelemetry context or CDC (Change Data Capture) kind.
+ *
+ * <p>The purpose of this class is to group targeted metadata fields together.
This makes it easier
+ * to define combination strategies (e.g., when accumulating state in {@code
ReduceFnRunner}) when
+ * multiple elements are merged or grouped, without having to extend method
signatures or state
+ * handling for every new metadata field.
+ */
+@AutoValue
+public abstract class CombinedMetadata {
+ public abstract CausedByDrain causedByDrain();
+
+ public static CombinedMetadata create(CausedByDrain causedByDrain) {
+ return new AutoValue_CombinedMetadata(causedByDrain);
+ }
+
+ public static CombinedMetadata createDefault() {
+ return create(CausedByDrain.NORMAL);
+ }
+
+ public static class Coder extends AtomicCoder<CombinedMetadata> {
+ private static final Coder INSTANCE = new Coder();
+
+ public static Coder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(CombinedMetadata value, OutputStream outStream) throws
IOException {
+ if (value == null) {
+ NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream);
+ return;
+ }
+ BeamFnApi.Elements.ElementMetadata.Builder builder =
+ BeamFnApi.Elements.ElementMetadata.newBuilder();
+ builder.setDrain(
+ value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
+ ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
+ : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
+
+
NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(),
outStream);
+ }
Review Comment:

The coder implementation is unnecessarily complex and inefficient. Using
`NullableCoder` and `ByteArrayCoder` inside an `AtomicCoder` adds multiple
layers of overhead (null markers, length prefixing). Since `CombinedMetadata`
is backed by a proto, we can use the proto's built-in delimited encoding which
is more efficient and handles length prefixing naturally. Additionally,
`AtomicCoder` implementations should generally be deterministic and not handle
nulls unless explicitly intended to be used in a context where nulls are
expected (which is not the case for state elements here).
```java
@Override
public void encode(CombinedMetadata value, OutputStream outStream)
throws IOException {
BeamFnApi.Elements.ElementMetadata proto =
BeamFnApi.Elements.ElementMetadata.newBuilder()
.setDrain(
value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
.build();
proto.writeDelimitedTo(outStream);
}
```
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -1009,8 +1025,21 @@ private void prefetchOnTrigger(
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
final boolean isFinished,
boolean isEndOfWindow,
- CausedByDrain causedByDrain)
+ CombinedMetadata metadata)
throws Exception {
+ BagState<CombinedMetadata> metadataState =
directContext.state().access(METADATA_TAG);
+ Iterable<CombinedMetadata> allMetadata = metadataState.read();
+ CombinedMetadata aggregatedMetadata =
+ CombinedMetadataCombiner.of().mergeAccumulators(allMetadata);
+
+ CombinedMetadata fullyAggregatedMetadata =
+ CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
+ final CausedByDrain aggregatedCausedByDrain =
fullyAggregatedMetadata.causedByDrain();
+ if (isFinished) {
+ metadataState.clear();
+ } else {
+ metadataState.add(metadata);
+ }
Review Comment:

Apply the same optimization here to avoid adding default metadata to the
state during trigger firing.
```suggestion
} else if (!metadata.equals(CombinedMetadata.createDefault())) {
metadataState.add(metadata);
}
```
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -604,6 +610,11 @@ private void processElement(Map<W, W> windowToMergeResult,
WindowedValue<InputT>
continue;
}
+ directContext
+ .state()
+ .access(METADATA_TAG)
+ .add(CombinedMetadata.create(value.causedByDrain()));
Review Comment:

To avoid unnecessary state writes for every element, we should only add
metadata to the state if it differs from the default (identity) value. Since
`CombinedMetadata.createDefault()` represents the neutral element for
combination (e.g., `NORMAL` for drain status), skipping it does not affect the
final result but significantly improves performance in the common case where
elements are not marked as "caused by drain".
```suggestion
CombinedMetadata metadata =
CombinedMetadata.create(value.causedByDrain());
if (!metadata.equals(CombinedMetadata.createDefault())) {
directContext.state().access(METADATA_TAG).add(metadata);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]