[
https://issues.apache.org/jira/browse/BEAM-6929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ismaël Mejía resolved BEAM-6929.
--------------------------------
Resolution: Fixed
> Session Windows with lateness cause NullPointerException in Flink Runner
> ------------------------------------------------------------------------
>
> Key: BEAM-6929
> URL: https://issues.apache.org/jira/browse/BEAM-6929
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Priority: Critical
> Fix For: 2.12.0
>
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Reported on the mailing list:
> {noformat}
> I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster -
> 1.7.2.
> I have this flow in my pipeline:
> KafkaSource(withCreateTime()) --> ApplyWindow(SessionWindow with
> gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default
> trigger) --> BeamSQL(GroupBy query) --> Window.remerge() --> Enrichment
> --> KafkaSink
> I am generating data in such a way that the first two records belong to two
> different sessions. And, generating the third record before the first session
> expires with the timestamp for the third record in such a way that the two
> sessions will be merged to become a single session.
> For Example, These are the sample input and output obtained when I ran the
> same pipeline in DirectRunner.
> Sample Input:
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
> Sample Output:
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
> 15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
> 15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
> Where "NumberOfRecords" is the count, "WST" is the Avro field Name which
> indicates the window start time for the session window. Similarly "WET"
> indicates the window End time of the session window. I am getting "WST" and
> "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
> The program ran successfully in DirectRunner. But, in FlinkRunner, I am
> getting this exception when the third record arrives:
> 2019-03-27 15:31:00,442 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
> DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
> -> (Window.Into()/Window.Assign.out ->
> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
> by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
> DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
> (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
> 2019-03-27 15:33:25,427 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
> ->
> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous)
> -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) ->
> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
> ->
> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
> -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
> key/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
> ProducerRecord/Map/ParMultiDo(Anonymous) ->
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
> (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
> org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
> at
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
> at
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
> at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
> at
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
> at
> org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
> at
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
> at
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
> at
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
> at
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
> at
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
> at
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> Is this a known issue with FlinkRunner? Is Session Windows with lateness
> @experimental in FlinkRunner?
> I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster -
> 1.5.3 and came across the same exception.
> I have also tried generating data with lateness as 0, and everything is
> working as expected. Seems like there is no problem in merging the windows of
> the records which belong to the same session.
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)