kellen commented on code in PR #34902:
URL: https://github.com/apache/beam/pull/34902#discussion_r2529343284


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java:
##########
@@ -1924,6 +1932,17 @@ public TimerMap timerFamily(String timerFamilyId) {
   private class NonWindowObservingProcessBundleContext
       extends NonWindowObservingProcessBundleContextBase {
 
+    @Override
+    public OutputBuilder<OutputT> builder(OutputT value) {
+      return WindowedValues.builder(currentElement)
+          .withValue(value)
+          .setReceiver(
+              windowedValue -> {
+                checkTimestamp(windowedValue.getTimestamp());

Review Comment:
   Thanks! For context, one of the failing tests is for our 
[AsyncDoFn](https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java),
 where we process elements, spinning off a set of futures while waiting for 
their respective responses, awaiting on those responses, and outputting those 
elements back to their original windows with their original timestamps. 
   
   The corresponding failing test is 
[here](https://github.com/spotify/scio/blob/main/scio-core/src/test/scala/com/spotify/scio/transforms/AsyncLookupDoFnTest.scala#L98-L113),
 giving the following exception:
   
   ```
   [info] - should propagate element metadata *** FAILED *** (45 milliseconds)
   [info]   org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException: Cannot output with timestamp 
1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than the 
timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus the 
allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See 
the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed 
skew.
   [info]   at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:377)
   [info]   at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:345)
   [info]   at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
   [info]   at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
   [info]   at org.apache.beam.sdk.Pipeline.run(Pipeline.java:325)
   [info]   at 
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:442)
   [info]   at 
org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:381)
   [info]   at com.spotify.scio.ScioContext.execute(ScioContext.scala:671)
   [info]   at 
com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:658)
   [info]   at 
com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:646)
   [info]   ...
   [info]   Cause: java.lang.IllegalArgumentException: Cannot output with 
timestamp 1970-01-01T00:00:00.001Z. Output timestamps must be no earlier than 
the timestamp of the current input or timer (1970-01-01T00:00:00.008Z) minus 
the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. 
See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the 
allowed skew.
   [info]   at 
org.apache.beam.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:263)
   [info]   at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:89)
   [info]   at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.lambda$outputWindowedValue$0(SimpleDoFnRunner.java:462)
   [info]   at 
org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
   [info]   at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:465)
   [info]   at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:123)
   [info]   at 
org.apache.beam.sdk.values.WindowedValues$Builder.output(WindowedValues.java:210)
   [info]   at 
org.apache.beam.sdk.transforms.DoFn$OutputReceiver.outputWindowedValue(DoFn.java:416)
   [info]   at 
com.spotify.scio.transforms.BaseAsyncLookupDoFn.lambda$processElement$0(BaseAsyncLookupDoFn.java:190)
   [info]   at 
com.spotify.scio.transforms.BaseAsyncLookupDoFn.flush(BaseAsyncLookupDoFn.java:310)
   [info]   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to