Maximilian Michels created BEAM-6227:
----------------------------------------

             Summary: FlinkRunner errors if GroupByKey contains null values 
(streaming mode only)
                 Key: BEAM-6227
                 URL: https://issues.apache.org/jira/browse/BEAM-6227
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
    Affects Versions: 2.9.0
            Reporter: Maximilian Michels
            Assignee: Maximilian Michels
             Fix For: 2.10.0


Apparently this passed ValidatesRunner because it is also present in streaming 
mode:

{noformat}
    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);

    options.setRunner(FlinkRunner.class);
    // force streaming mode
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(GenerateSequence.from(0).to(100))
        .apply(Window.into(FixedWindows.of(Duration.millis(10))))
        .apply(ParDo.of(
            new DoFn<Long, KV<String, Void>>() {
              @ProcessElement
              public void processElement(ProcessContext pc) {
                pc.output(KV.of("hello", null));
              }
            }
        ))
        .apply(GroupByKey.create());

    pipeline.run();
{noformat}

Throws:

{noformat}
Caused by: java.lang.RuntimeException: Error adding to bag state.
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:299)
        at 
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
        at 
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
        at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
        at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
Caused by: java.lang.NullPointerException: You cannot add null to a ListState.
        at 
org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
        at 
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:89)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkBagState.add(FlinkStateInternals.java:297)
        at 
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:115)
        at 
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:608)
        at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:349)
        at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
        at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
 Source)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
        at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
        at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
        at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:460)
        at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

Will do a follow-up for running ValidatesRunner in streaming mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to