[
https://issues.apache.org/jira/browse/BEAM-6227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720186#comment-16720186
]
Maximilian Michels commented on BEAM-6227:
------------------------------------------
It appears ValidatesRunner is correctly setup to run in batch and streaming and
it passes. The tests don't seem to catch this.
> FlinkRunner errors if GroupByKey contains null values (streaming mode only)
> ---------------------------------------------------------------------------
>
> Key: BEAM-6227
> URL: https://issues.apache.org/jira/browse/BEAM-6227
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.9.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Major
> Fix For: 2.10.0
>
>
> Apparently this passed ValidatesRunner because it is also present in
> streaming mode:
> {noformat}
> FlinkPipelineOptions options =
> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
> options.setRunner(FlinkRunner.class);
> // force streaming mode
> options.setStreaming(true);
> Pipeline pipeline = Pipeline.create(options);
> pipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(Window.into(FixedWindows.of(Duration.millis(10))))
> .apply(ParDo.of(
> new DoFn<Long, KV<String, Void>>() {
> @ProcessElement
> public void processElement(ProcessContext pc) {
> pc.output(KV.of("hello", null));
> }
> }
> ))
> .apply(GroupByKey.create());
> pipeline.run();
> {noformat}
> Throws:
> {noformat}
> Caused by: java.lang.RuntimeException: Error adding to bag state.
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:299)
> at
> org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> Caused by: java.lang.NullPointerException: You cannot add null to a ListState.
> at
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
> at
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:89)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:297)
> at
> org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:460)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Will do a follow-up for running ValidatesRunner in streaming mode.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)