gemini-code-assist[bot] commented on code in PR #38230:
URL: https://github.com/apache/beam/pull/38230#discussion_r3183822825


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -107,6 +108,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
    *   <li>It uses discarding or accumulation mode according to the {@link 
WindowingStrategy}.
    * </ul>
    */
+  static final StateTag<BagState<CombinedMetadata>> METADATA_TAG =
+      StateTags.makeSystemTagInternal(
+          StateTags.bag("combinedMetadata", CombinedMetadata.Coder.of()));

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Using `BagState` for metadata that is always combined into a single value is 
inefficient. It leads to O(N) state growth (where N is the number of elements) 
and O(N) read/merge time during firing. Since the metadata (like 
`causedByDrain`) follows simple combination rules (e.g., OR logic), it is much 
more efficient to use a `CombiningState`. This allows the runner to store only 
the aggregated value and potentially combine updates eagerly, significantly 
reducing I/O and memory overhead for high-volume windows.
   
   ```suggestion
     static final StateTag<CombiningState<CombinedMetadata, CombinedMetadata, 
CombinedMetadata>> METADATA_TAG =
         StateTags.makeSystemTagInternal(
             StateTags.combiningValue(
                 "combinedMetadata", CombinedMetadata.Coder.of(), 
CombinedMetadataCombiner.of()));
   ```



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -1009,8 +1025,21 @@ private void prefetchOnTrigger(
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
       final boolean isFinished,
       boolean isEndOfWindow,
-      CausedByDrain causedByDrain)
+      CombinedMetadata metadata)
       throws Exception {
+    BagState<CombinedMetadata> metadataState = 
directContext.state().access(METADATA_TAG);
+    Iterable<CombinedMetadata> allMetadata = metadataState.read();
+    CombinedMetadata aggregatedMetadata =
+        CombinedMetadataCombiner.of().mergeAccumulators(allMetadata);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Update this to use the `CombiningState` API. This simplifies the logic as 
the state already provides the aggregated value directly, avoiding the need to 
manually call `mergeAccumulators` on an iterable.
   
   ```suggestion
       CombiningState<CombinedMetadata, CombinedMetadata, CombinedMetadata> 
metadataState =
           directContext.state().access(METADATA_TAG);
       CombinedMetadata aggregatedMetadata = metadataState.read();
   ```



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.values.CausedByDrain;
+
+/**
+ * Encapsulates metadata that propagates with elements in the pipeline.
+ *
+ * <p>This metadata is sent along with elements. It currently includes fields 
like {@link
+ * CausedByDrain}, and is designed to be extensible to support future metadata 
fields such as
+ * OpenTelemetry context or CDC (Change Data Capture) kind.
+ *
+ * <p>The purpose of this class is to group targeted metadata fields together. 
This makes it easier
+ * to define combination strategies (e.g., when accumulating state in {@code 
ReduceFnRunner}) when
+ * multiple elements are merged or grouped, without having to extend method 
signatures or state
+ * handling for every new metadata field.
+ */
+@AutoValue
+public abstract class CombinedMetadata {
+  public abstract CausedByDrain causedByDrain();
+
+  public static CombinedMetadata create(CausedByDrain causedByDrain) {
+    return new AutoValue_CombinedMetadata(causedByDrain);
+  }
+
+  public static CombinedMetadata createDefault() {
+    return create(CausedByDrain.NORMAL);
+  }
+
+  public static class Coder extends AtomicCoder<CombinedMetadata> {
+    private static final Coder INSTANCE = new Coder();
+
+    public static Coder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(CombinedMetadata value, OutputStream outStream) throws 
IOException {
+      if (value == null) {
+        NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream);
+        return;
+      }
+      BeamFnApi.Elements.ElementMetadata.Builder builder =
+          BeamFnApi.Elements.ElementMetadata.newBuilder();
+      builder.setDrain(
+          value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
+              ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
+              : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
+
+      
NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(), 
outStream);
+    }
+
+    @Override
+    public CombinedMetadata decode(InputStream inStream) throws IOException {
+      byte[] bytes = NullableCoder.of(ByteArrayCoder.of()).decode(inStream);
+      if (bytes == null) {
+        return CombinedMetadata.createDefault();
+      }
+      BeamFnApi.Elements.ElementMetadata proto =
+          BeamFnApi.Elements.ElementMetadata.parseFrom(bytes);
+
+      CausedByDrain causedByDrain =
+          proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
+              ? CausedByDrain.CAUSED_BY_DRAIN
+              : CausedByDrain.NORMAL;
+
+      return CombinedMetadata.create(causedByDrain);
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Simplify the decoding logic to match the delimited proto encoding. 
`parseDelimitedFrom` returns `null` if the stream is at EOF, which can be used 
to return the default metadata.
   
   ```suggestion
       @Override
       public CombinedMetadata decode(InputStream inStream) throws IOException {
         BeamFnApi.Elements.ElementMetadata proto =
             BeamFnApi.Elements.ElementMetadata.parseDelimitedFrom(inStream);
         if (proto == null) {
           return CombinedMetadata.createDefault();
         }
   
         CausedByDrain causedByDrain =
             proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
                 ? CausedByDrain.CAUSED_BY_DRAIN
                 : CausedByDrain.NORMAL;
   
         return CombinedMetadata.create(causedByDrain);
       }
   ```



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/CombinedMetadata.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.values.CausedByDrain;
+
+/**
+ * Encapsulates metadata that propagates with elements in the pipeline.
+ *
+ * <p>This metadata is sent along with elements. It currently includes fields 
like {@link
+ * CausedByDrain}, and is designed to be extensible to support future metadata 
fields such as
+ * OpenTelemetry context or CDC (Change Data Capture) kind.
+ *
+ * <p>The purpose of this class is to group targeted metadata fields together. 
This makes it easier
+ * to define combination strategies (e.g., when accumulating state in {@code 
ReduceFnRunner}) when
+ * multiple elements are merged or grouped, without having to extend method 
signatures or state
+ * handling for every new metadata field.
+ */
+@AutoValue
+public abstract class CombinedMetadata {
+  public abstract CausedByDrain causedByDrain();
+
+  public static CombinedMetadata create(CausedByDrain causedByDrain) {
+    return new AutoValue_CombinedMetadata(causedByDrain);
+  }
+
+  public static CombinedMetadata createDefault() {
+    return create(CausedByDrain.NORMAL);
+  }
+
+  public static class Coder extends AtomicCoder<CombinedMetadata> {
+    private static final Coder INSTANCE = new Coder();
+
+    public static Coder of() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void encode(CombinedMetadata value, OutputStream outStream) throws 
IOException {
+      if (value == null) {
+        NullableCoder.of(ByteArrayCoder.of()).encode(null, outStream);
+        return;
+      }
+      BeamFnApi.Elements.ElementMetadata.Builder builder =
+          BeamFnApi.Elements.ElementMetadata.newBuilder();
+      builder.setDrain(
+          value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
+              ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
+              : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING);
+
+      
NullableCoder.of(ByteArrayCoder.of()).encode(builder.build().toByteArray(), 
outStream);
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The coder implementation is unnecessarily complex and inefficient. Using 
`NullableCoder` and `ByteArrayCoder` inside an `AtomicCoder` adds multiple 
layers of overhead (null markers, length prefixing). Since `CombinedMetadata` 
is backed by a proto, we can use the proto's built-in delimited encoding which 
is more efficient and handles length prefixing naturally. Additionally, 
`AtomicCoder` implementations should generally be deterministic and not handle 
nulls unless explicitly intended to be used in a context where nulls are 
expected (which is not the case for state elements here).
   
   ```java
       @Override
       public void encode(CombinedMetadata value, OutputStream outStream) 
throws IOException {
         BeamFnApi.Elements.ElementMetadata proto =
             BeamFnApi.Elements.ElementMetadata.newBuilder()
                 .setDrain(
                     value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
                         ? BeamFnApi.Elements.DrainMode.Enum.DRAINING
                         : BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
                 .build();
         proto.writeDelimitedTo(outStream);
       }
   ```



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -1009,8 +1025,21 @@ private void prefetchOnTrigger(
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
       final boolean isFinished,
       boolean isEndOfWindow,
-      CausedByDrain causedByDrain)
+      CombinedMetadata metadata)
       throws Exception {
+    BagState<CombinedMetadata> metadataState = 
directContext.state().access(METADATA_TAG);
+    Iterable<CombinedMetadata> allMetadata = metadataState.read();
+    CombinedMetadata aggregatedMetadata =
+        CombinedMetadataCombiner.of().mergeAccumulators(allMetadata);
+
+    CombinedMetadata fullyAggregatedMetadata =
+        CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
+    final CausedByDrain aggregatedCausedByDrain = 
fullyAggregatedMetadata.causedByDrain();
+    if (isFinished) {
+      metadataState.clear();
+    } else {
+      metadataState.add(metadata);
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Apply the same optimization here to avoid adding default metadata to the 
state during trigger firing.
   
   ```suggestion
       } else if (!metadata.equals(CombinedMetadata.createDefault())) {
         metadataState.add(metadata);
       }
   ```



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -604,6 +610,11 @@ private void processElement(Map<W, W> windowToMergeResult, 
WindowedValue<InputT>
         continue;
       }
 
+      directContext
+          .state()
+          .access(METADATA_TAG)
+          .add(CombinedMetadata.create(value.causedByDrain()));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To avoid unnecessary state writes for every element, we should only add 
metadata to the state if it differs from the default (identity) value. Since 
`CombinedMetadata.createDefault()` represents the neutral element for 
combination (e.g., `NORMAL` for drain status), skipping it does not affect the 
final result but significantly improves performance in the common case where 
elements are not marked as "caused by drain".
   
   ```suggestion
         CombinedMetadata metadata = 
CombinedMetadata.create(value.causedByDrain());
         if (!metadata.equals(CombinedMetadata.createDefault())) {
           directContext.state().access(METADATA_TAG).add(metadata);
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to