stankiewicz commented on code in PR #36534:
URL: https://github.com/apache/beam/pull/36534#discussion_r2549897577


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java:
##########
@@ -129,11 +134,19 @@ protected WindowedValue<T> decodeMessage(Windmill.Message 
message) throws IOExce
         T result =
             (T) KV.of(decode(kvCoder.getKeyCoder(), key), 
decode(kvCoder.getValueCoder(), data));
         // todo #33176 propagate metadata to windowed value
-        return WindowedValues.of(result, timestampMillis, windows, paneInfo);
+        return WindowedValues.of(
+            result, timestampMillis, windows, paneInfo, null, null, 
drainingValueFromUpstream);

Review Comment:
   @kennknowles 
   https://s.apache.org/beam-drain-mode the point of exposing drain was to 
inform that partial aggregation or timer and eventually downstream value was 
generated due to drain. 
   
   If we have aggregation that happened before drain and goes through stateful 
processing and then is unpacked, we should just propagate what was sent via 
WindowedValue. If we take information from context, it will change the meaning. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to