Maximilian Michels created BEAM-6227: ----------------------------------------
Summary: FlinkRunner errors if GroupByKey contains null values (streaming mode only) Key: BEAM-6227 URL: https://issues.apache.org/jira/browse/BEAM-6227 Project: Beam Issue Type: Bug Components: runner-flink Affects Versions: 2.9.0 Reporter: Maximilian Michels Assignee: Maximilian Michels Fix For: 2.10.0 Apparently this passed ValidatesRunner because it is also present in streaming mode: {noformat} FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); options.setRunner(FlinkRunner.class); // force streaming mode options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline .apply(GenerateSequence.from(0).to(100)) .apply(Window.into(FixedWindows.of(Duration.millis(10)))) .apply(ParDo.of( new DoFn<Long, KV<String, Void>>() { @ProcessElement public void processElement(ProcessContext pc) { pc.output(KV.of("hello", null)); } } )) .apply(GroupByKey.create()); pipeline.run(); {noformat} Throws: {noformat} Caused by: java.lang.RuntimeException: Error adding to bag state. at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:299) at org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115) at org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608) at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) Caused by: java.lang.NullPointerException: You cannot add null to a ListState. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:89) at org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:297) at org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115) at org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608) at org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136) at org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240) at org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80) at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63) at org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:460) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712) at java.lang.Thread.run(Thread.java:745) {noformat} Will do a follow-up for running ValidatesRunner in streaming mode. -- This message was sent by Atlassian JIRA (v7.6.3#76005)