kellen commented on code in PR #34902:
URL: https://github.com/apache/beam/pull/34902#discussion_r2529343284
##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -1924,6 +1932,17 @@ public TimerMap timerFamily(String timerFamilyId) {
private class NonWindowObservingProcessBundleContext
extends NonWindowObservingProcessBundleContextBase {
+ @Override
+ public OutputBuilder<OutputT> builder(OutputT value) {
+ return WindowedValues.builder(currentElement)
+ .withValue(value)
+ .setReceiver(
+ windowedValue -> {
+ checkTimestamp(windowedValue.getTimestamp());
Review Comment:
Thanks! For context, one of the failing tests is for our
[AsyncDoFn](https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java),
where we process elements, spinning off a set of futures while waiting for
their respective responses, awaiting on those responses, and outputting those
elements back to their original windows with their original timestamps.
The corresponding failing test is
[here](https://github.com/spotify/scio/blob/main/scio-core/src/test/scala/com/spotify/scio/transforms/AsyncLookupDoFnTest.scala#L98-L113),
giving the following exception:
```
[info] - should propagate element metadata *** FAILED *** (45 milliseconds)
[info] org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException: Cannot output with timestamp
1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the
timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the
allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See
the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed
skew.
[info] at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
[info] at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
[info] at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
[info] at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
[info] at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
[info] at
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
[info] at
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
[info] at com.spotify.scio.ScioContext.execute(ScioContext.scala:671)
[info] at
com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:658)
[info] at
com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:646)
[info] ...
[info] Cause: java.lang.IllegalArgumentException: Cannot output with
timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than
the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus
the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z.
See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the
allowed skew.
[info] at
org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:263)
[info] at
org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:89)
[info] at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:462)
[info] at
org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info] at
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
[info] at
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
[info] at
org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
[info] at
org.apache.beam.sdk.transforms.DoFn$OutputReceiver.outputWindowedValue(DoFn.java:416)
[info] at
com.spotify.scio.transforms.BaseAsyncLookupDoFn.lambda$processElement$0(BaseAsyncLookupDoFn.java:190)
[info] at
com.spotify.scio.transforms.BaseAsyncLookupDoFn.flush(BaseAsyncLookupDoFn.java:310)
[info] ...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]