gemini-code-assist[bot] commented on code in PR #38230:
URL: https://github.com/apache/beam/pull/38230#discussion_r3187939128
##########
runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java:
##########
@@ -1009,8 +1026,22 @@ private void prefetchOnTrigger(
ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
final boolean isFinished,
boolean isEndOfWindow,
- CausedByDrain causedByDrain)
+ CombinedMetadata metadata)
throws Exception {
+ CombiningState<CombinedMetadata, CombinedMetadata, CombinedMetadata>
metadataState =
+ directContext.state().access(METADATA_TAG);
+ CombinedMetadata aggregatedMetadata = metadataState.read();
+ if (aggregatedMetadata == null) {
+ aggregatedMetadata = CombinedMetadata.createDefault();
+ }
+ CombinedMetadata fullyAggregatedMetadata =
+ CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
+ final CausedByDrain aggregatedCausedByDrain =
fullyAggregatedMetadata.causedByDrain();
+ if (isFinished) {
+ metadataState.clear();
+ } else if (!metadata.equals(CombinedMetadata.createDefault())) {
+ metadataState.add(metadata);
+ }
Review Comment:

In `onTrigger`, the trigger's `metadata` is added to the state if
`isFinished` is false. Since `METADATA_TAG` is a `CombiningState` using
`CombinedMetadataCombiner` (which performs a logical OR for `causedByDrain`),
adding the same metadata multiple times is idempotent for the current
implementation. However, if future metadata fields are added that are not
idempotent, this logic might lead to incorrect aggregation. Consider if the
trigger's metadata should only be used for the current output pane or if it
truly represents a state change that must persist for all future panes of this
window.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]