[ 
https://issues.apache.org/jira/browse/BEAM-6929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismaël Mejía resolved BEAM-6929.
--------------------------------
    Resolution: Fixed

> Session Windows with lateness cause NullPointerException in Flink Runner
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6929
>                 URL: https://issues.apache.org/jira/browse/BEAM-6929
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.12.0
>
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Reported on the mailing list:
> {noformat}
> I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster - 
> 1.7.2.
> I have this flow in my pipeline:
> KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with 
> gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default 
> trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->  Enrichment 
>  -->  KafkaSink
> I am generating data in such a way that the first two records belong to two 
> different sessions. And, generating the third record before the first session 
> expires with the timestamp for the third record in such a way that the two 
> sessions will be merged to become a single session.
> For Example, These are the sample input and output obtained when I ran the 
> same pipeline in DirectRunner.
> Sample Input:
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
> Sample Output:
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>  15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>  15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
>  15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
> Where "NumberOfRecords" is the count, "WST" is the Avro field Name which 
> indicates the window start time for the session window. Similarly "WET" 
> indicates the window End time of the session window. I am getting "WST" and 
> "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
> The program ran successfully in DirectRunner. But, in FlinkRunner, I am 
> getting this exception when the third record arrives:
> 2019-03-27 15:31:00,442 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> 
> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
>  -> (Window.Into()/Window.Assign.out -> 
> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
>  -> 
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
>  by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem, 
> DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>  -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default 
> key/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka 
> ProducerRecord/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
>  (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
> 2019-03-27 15:33:25,427 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
>  -> 
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
>  -> 
> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous) 
> -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) -> 
> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) -> 
> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
>  -> 
> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>  -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default 
> key/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka 
> ProducerRecord/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>  (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
> org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
>         at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>         at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>         at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>         at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>         at 
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>         at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
>         at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
>         at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
>         at 
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
>         at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
>         at 
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
>         at 
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
>         at 
> org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
>         at 
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
>         at 
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
>         at 
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
>         at 
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
>         at 
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
>         at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> Is this a known issue with FlinkRunner? Is Session Windows with lateness 
> @experimental in FlinkRunner?
> I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster - 
> 1.5.3 and came across the same exception.
> I have also tried generating data with lateness as 0, and everything is 
> working as expected. Seems like there is no problem in merging the windows of 
> the records which belong to the same session.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to