kennknowles commented on code in PR #34902:
URL: https://github.com/apache/beam/pull/34902#discussion_r2534601223
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -1924,6 +1932,17 @@ public TimerMap timerFamily(String timerFamilyId) {
private class NonWindowObservingProcessBundleContext
extends NonWindowObservingProcessBundleContextBase {
+ @Override
+ public OutputBuilder<OutputT> builder(OutputT value) {
+ return WindowedValues.builder(currentElement)
+ .withValue(value)
+ .setReceiver(
+ windowedValue -> {
+ checkTimestamp(windowedValue.getTimestamp());
Review Comment:
Interesting. This is actually a red herring. The error is not coming from
this `checkTimestamp`. This is the portable / Dataflow v2 path.
The `SimpleDoFnRunner` is the v1 codepath and I see it now:
- The failure is thrown from
https://github.com/apache/beam/pull/34902/files/2cff4cc48abb7be6e7f91d3ab76dd6235c87938e#diff-560313cd06556c758d162abea624cab2646038cdabf483cdad69d392bf720a0bL252
- The added check is in this block
https://github.com/apache/beam/pull/34902/files/2cff4cc48abb7be6e7f91d3ab76dd6235c87938e#diff-560313cd06556c758d162abea624cab2646038cdabf483cdad69d392bf720a0bR502
I've updated https://github.com/apache/beam/pull/36822 to remove that check.
On the other hand, it looks to be correct. I wonder if the problem is that
`elem` is not captured by the lambda and is being accessed as a field later
when the local element has changed when the AsyncDoFn is outputting and it is
validating the timestamp against a later element. SimpleDoFnRunner is not
designed for concurrent or async use, so that could be the problem. A simple
fix that would preserve safety would be to capture it in a local variable that
ends up in the closure of the lambda.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]