[
https://issues.apache.org/jira/browse/BEAM-11028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17209754#comment-17209754
]
Luke Cwik commented on BEAM-11028:
----------------------------------
I figured out what the issue is.
Overall the problem is that the watermark is advancing to infinity
(BoundedWindow.TIMESTAMP_MAX_VALUE)in the Nexmark EventGenerator source when it
is "done". On other runners this causes all processing time timers to be
dropped as there is no more work to do. Flink clears the state but still fires
the timer.
I believe the fix should be that UnboundedSourceAsSdfWrapper should return
stop() when the watermark advances to infinity.
> NullPointerException when running Flink Nexmark tests on Streaming after
> switch to SDF based translation
> --------------------------------------------------------------------------------------------------------
>
> Key: BEAM-11028
> URL: https://issues.apache.org/jira/browse/BEAM-11028
> Project: Beam
> Issue Type: Test
> Components: runner-core, runner-flink, testing-nexmark
> Reporter: Ismaël Mejía
> Assignee: Luke Cwik
> Priority: P0
> Fix For: 2.25.0
>
>
> When running Nexmark on Streaming mode with Flink locally via:
>
> /gradlew :sdks:java:testing:nexmark:run \
> -Pnexmark.runner=":runners:flink:1.10" \
> -Pnexmark.args=" \
> --runner=FlinkRunner \
> --streaming=true \
> --suite=SMOKE \
> --manageResources=false \
> --monitorJobs=true \
> --enforceEncodability=true \
> --enforceImmutability=true"
>
> I see the following error and get no results (this works ok in Beam 2.24.0)
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> java.lang.NullPointerException
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeNewWatermarkEstimator(Unknown
> Source)
> at
> org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:439)
> at
> org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:183)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.fireTimer(SplittableDoFnOperator.java:171)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.processPendingProcessingTimeTimers(DoFnOperator.java:1317)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.close(DoFnOperator.java:575)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator.close(SplittableDoFnOperator.java:179)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:618)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$afterInvoke$1(StreamTask.java:498)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:496)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:477)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.ensureTimestampWithinBounds(Read.java:541)
> at
> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.newWatermarkEstimator(Read.java:552)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)