gemini-code-assist[bot] commented on code in PR #37232:
URL: https://github.com/apache/beam/pull/37232#discussion_r2667026625
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java:
##########
@@ -129,25 +131,46 @@ protected WindowedValue<T> decodeMessage(Windmill.Message
message) throws IOExce
drainingValueFromUpstream =
elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING;
}
+
+ // Propagate record ID and offset
+ String recordId = null;
+ Long recordOffset = null;
+ if (context.offsetBasedDeduplicationSupported()) {
+ byte[] rawId = context.getCurrentRecordId();
+ if (rawId != null && rawId.length > 0) {
+ recordId = new String(rawId, StandardCharsets.UTF_8);
+ }
+ byte[] rawOffset = context.getCurrentRecordOffset();
+ if (rawOffset != null && rawOffset.length == Longs.BYTES) {
+ recordOffset = Longs.fromByteArray(rawOffset);
+ }
+ }
Review Comment:

This logic for retrieving record ID and offset from
`StreamingModeExecutionContext` appears to have a potential flaw. The
`UngroupedWindmillReaderIterator` iterates over messages within a `WorkItem`,
but the underlying `activeReader` in the context (from which
`getCurrentRecordId` and `getCurrentRecordOffset` get their values) does not
appear to be advanced per message. This will likely result in all elements from
the same `WorkItem` receiving the same record ID and offset, which would be
incorrect for element-level metadata and could break features like
deduplication.
A more robust approach would be to extract the record ID and offset directly
from the `Windmill.InputMessageBundle` for each message, if this information is
available there. This would ensure that each element gets its unique metadata.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]