[ 
https://issues.apache.org/jira/browse/BEAM-6929?focusedWorklogId=220278&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-220278
 ]

ASF GitHub Bot logged work on BEAM-6929:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Mar/19 22:00
            Start Date: 28/Mar/19 22:00
    Worklog Time Spent: 10m 
      Work Description: mxm commented on issue #8162: [BEAM-6929] Prevent 
NullPointerException in Flink's CombiningState
URL: https://github.com/apache/beam/pull/8162#issuecomment-477787100
 
 
   Appears this is a bug in the Spark Runner as well.. 
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 220278)
    Time Spent: 0.5h  (was: 20m)

> Session Windows with lateness cause NullPointerException in Flink Runner
> ------------------------------------------------------------------------
>
>                 Key: BEAM-6929
>                 URL: https://issues.apache.org/jira/browse/BEAM-6929
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>             Fix For: 2.12.0
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Reported on the mailing list:
> {noformat}
> I am using Beam 2.11.0, Runner - beam-runners-flink-1.7, Flink Cluster - 
> 1.7.2.
> I have this flow in my pipeline:
> KafkaSource(withCreateTime())  -->  ApplyWindow(SessionWindow with 
> gapDuration=1 Minute, lateness=3 Minutes, AccumulatingFiredPanes, default 
> trigger)  -->  BeamSQL(GroupBy query)  -->  Window.remerge()  -->  Enrichment 
>  -->  KafkaSink
> I am generating data in such a way that the first two records belong to two 
> different sessions. And, generating the third record before the first session 
> expires with the timestamp for the third record in such a way that the two 
> sessions will be merged to become a single session.
> For Example, These are the sample input and output obtained when I ran the 
> same pipeline in DirectRunner.
> Sample Input:
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-27-44"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-51"}}
> {"Col1":{"string":"str1"}, "Timestamp":{"string":"2019-03-27 15-28-26"}}
> Sample Output:
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>  15-27-44"},"WET":{"string":"2019-03-27 15-28-44"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":1},"WST":{"string":"2019-03-27
>  15-28-51"},"WET":{"string":"2019-03-27 15-29-51"}}
> {"Col1":{"string":"str1"},"NumberOfRecords":{"long":3},"WST":{"string":"2019-03-27
>  15-27-44"},"WET":{"string":"2019-03-27 15-29-51"}}
> Where "NumberOfRecords" is the count, "WST" is the Avro field Name which 
> indicates the window start time for the session window. Similarly "WET" 
> indicates the window End time of the session window. I am getting "WST" and 
> "WET" after remerging and applying ParDo(Enrichment stage of the pipeline).
> The program ran successfully in DirectRunner. But, in FlinkRunner, I am 
> getting this exception when the third record arrives:
> 2019-03-27 15:31:00,442 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> DfleKafkaSource/KafkaIO.Read/Read(KafkaUnboundedSource) -> Flat Map -> 
> DfleKafkaSource/ParDo(ConvertKafkaRecordtoRow)/ParMultiDo(ConvertKafkaRecordtoRow)
>  -> (Window.Into()/Window.Assign.out -> 
> DfleSql/SqlTransform/BeamIOSourceRel_4/Convert.ConvertTransform/ParDo(Anonymous)/ParMultiDo(Anonymous)
>  -> 
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/Group
>  by fields/ParMultiDo(Anonymous) -> ToKeyedWorkItem, 
> DfleKafkaSink/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>  -> DfleKafkaSink/KafkaIO.KafkaValueWrite/Kafka values with default 
> key/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka 
> ProducerRecord/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter))
>  (1/1) (d00be62e110cef00d9d772042f4b87a9) switched from DEPLOYING to RUNNING.
> 2019-03-27 15:33:25,427 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey
>  -> 
> DfleSql/SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Combine.GroupedValues/ParDo(Anonymous)/ParMultiDo(Anonymous)
>  -> 
> DfleSql/SqlTransform/BeamAggregationRel_45/mergeRecord/ParMultiDo(Anonymous) 
> -> DfleSql/SqlTransform/BeamCalcRel_46/ParDo(Calc)/ParMultiDo(Calc) -> 
> DfleSql/Window.Remerge/Identity/Map/ParMultiDo(Anonymous) -> 
> DfleSql/ParDo(EnrichRecordWithWindowTimeInfo)/ParMultiDo(EnrichRecordWithWindowTimeInfo)
>  -> 
> DfleKafkaSink2/ParDo(RowToGenericRecordConverter)/ParMultiDo(RowToGenericRecordConverter)
>  -> DfleKafkaSink2/KafkaIO.KafkaValueWrite/Kafka values with default 
> key/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/Kafka 
> ProducerRecord/Map/ParMultiDo(Anonymous) -> 
> DfleKafkaSink2/KafkaIO.KafkaValueWrite/KafkaIO.Write/KafkaIO.WriteRecords/ParDo(KafkaWriter)/ParMultiDo(KafkaWriter)
>  (1/1) (d95af17b7457443c13bd327b46b282e6) switched from RUNNING to FAILED.
> org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
>         at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:34)
>         at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
>  Source)
>         at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:275)
>         at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:240)
>         at 
> org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
>         at 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:63)
>         at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:457)
>         at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>         at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
>         at 
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:703)
>         at 
> org.apache.beam.sdk.transforms.Combine$BinaryCombineLongFn.mergeAccumulators(Combine.java:674)
>         at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.mergeAccumulators(FlinkStateInternals.java:517)
>         at 
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:192)
>         at 
> org.apache.beam.runners.core.StateMerging.mergeCombiningValues(StateMerging.java:162)
>         at 
> org.apache.beam.runners.core.NonEmptyPanes$GeneralNonEmptyPanes.onMerge(NonEmptyPanes.java:132)
>         at 
> org.apache.beam.runners.core.ReduceFnRunner$OnMergeCallback.onMerge(ReduceFnRunner.java:507)
>         at 
> org.apache.beam.runners.core.MergingActiveWindowSet$MergeContextImpl.recordMerges(MergingActiveWindowSet.java:211)
>         at 
> org.apache.beam.runners.core.MergingActiveWindowSet.merge(MergingActiveWindowSet.java:229)
>         at 
> org.apache.beam.runners.core.ReduceFnRunner.mergeWindows(ReduceFnRunner.java:436)
>         at 
> org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:329)
>         at 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
> Is this a known issue with FlinkRunner? Is Session Windows with lateness 
> @experimental in FlinkRunner?
> I have also tried with Runner - beam-runners-flink_2.11, Flink Cluster - 
> 1.5.3 and came across the same exception.
> I have also tried generating data with lateness as 0, and everything is 
> working as expected. Seems like there is no problem in merging the windows of 
> the records which belong to the same session.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to