Maximilian Michels created BEAM-6227:
----------------------------------------
Summary: FlinkRunner errors if GroupByKey contains null values
(streaming mode only)
Key: BEAM-6227
URL: https://issues.apache.org/jira/browse/BEAM-6227
Project: Beam
Issue Type: Bug
Components: runner-flink
Affects Versions: 2.9.0
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Fix For: 2.10.0
Apparently this passed ValidatesRunner because it is also present in streaming
mode:
{noformat}
FlinkPipelineOptions options =
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
// force streaming mode
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(GenerateSequence.from(0).to(100))
.apply(Window.into(FixedWindows.of(Duration.millis(10))))
.apply(ParDo.of(
new DoFn<Long, KV<String, Void>>() {
@ProcessElement
public void processElement(ProcessContext pc) {
pc.output(KV.of("hello", null));
}
}
))
.apply(GroupByKey.create());
pipeline.run();
{noformat}
Throws:
{noformat}
Caused by: java.lang.RuntimeException: Error adding to bag state.
at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:299)
at
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
at
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
Caused by: java.lang.NullPointerException: You cannot add null to a ListState.
at
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:89)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:297)
at
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
at
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:460)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:745)
{noformat}
Will do a follow-up for running ValidatesRunner in streaming mode.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)