Maximilian Michels created BEAM-6929:
----------------------------------------
Summary: Session Windows with lateness cause NullPointerException
in Flink Runner
Key: BEAM-6929
URL: https://issues.apache.org/jira/browse/BEAM-6929
Project: Beam
Issue Type: Bug
Components: runner-flink
Reporter: Maximilian Michels
Assignee: Maximilian Michels
Fix For: 2.12.0
Reported on the mailing list:
{noformat}
I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster - 1.7.2.
I have this flow in my pipeline:
KafkaSource(withCreateTime()) --> ApplyWindow(SessionWindow with
gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default
trigger) --> BeamSQL(GroupBy query) --> Window.remerge() --> Enrichment
--> KafkaSink
I am generating data in such a way that the first two records belong to two
different sessions. And, generating the third record before the first session
expires with the timestamp for the third record in such a way that the two
sessions will be merged to become a single session.
For Example, These are the sample input and output obtained when I ran the same
pipeline in DirectRunner.
Sample Input:
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
{"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
Sample Output:
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
{"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
Where "NumberOfRecords" is the count, "WST" is the Avro field Name which
indicates the window start time for the session window. Similarly "WET"
indicates the window End time of the session window. I am getting "WST" and
"WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
The program ran successfully in DirectRunner. But, in FlinkRunner, I am getting
this exception when the third record arrives:
2019-03-27 15:31:00,442 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map ->
DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
-> (Window.Into()/Window.Assign.out ->
DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem,
DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
(1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
2019-03-27 15:33:25,427 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph -
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
->
DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
->
DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous) ->
DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) ->
DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) ->
DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
->
DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
-> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default
key/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka
ProducerRecord/Map/ParMultiDo(Anonymous) ->
DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
(1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
at
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
at
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
at
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
at
org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
at
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
at
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
at
org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
at
org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
at
org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
at
org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
at
org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
at
org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
at
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
at
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
Is this a known issue with FlinkRunner? Is Session Windows with lateness
@experimental in FlinkRunner?
I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster - 1.5.3
and came across the same exception.
I have also tried generating data with lateness as 0, and everything is working
as expected. Seems like there is no problem in merging the windows of the
records which belong to the same session.
{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)