I am using FileIO and I do observe the drop of pane info information on Flink runner too. It was mentioned in this thread: https://www.mail-archive.com/dev@beam.apache.org/msg20186.html
It is a result of different reshuffle expansion for optimisation reasons. However, I did not observe a data loss in my case. Windowing and watermark info should be preserved. Pane info is not, which brings a question how reliable pane info should be in terms of SDK and runner. If you do observe a data loss, it would be great to share a test case which replicates the problem. On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote: > Ah, I think I see the problem. > > It appears that for some reason, the Flink runner loses windowing > information when a Reshuffle is applied. I'm not entirely sure why, because > windowing information should be maintained across a Reshuffle. > > Reuven > > On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com> wrote: > >> >> Hi, >> >> I have added some logs to the pipeline as following (you can find the log >> function in the Appendix): >> >> //STREAM + processing time. >> pipeline.apply(KafkaIO.read()) >> .apply(...) //mappers, a window and a combine >> .apply(logBeforeWrite()) >> >> .apply("WriteFiles", >> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >> .getPerDestinationOutputFilenames() >> >> .apply(logAfterWrite()) >> .apply("CombineFileNames", Combine.perKey(...)) >> >> I have run the pipeline using DirectRunner (local), SparkRunner and >> FlinkRunner, both of them using a cluster. >> Below you can see the timing and pane information before/after (you can >> see traces in detail with window and timestamp information in the Appendix). >> >> DirectRunner: >> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, >> timing=ON_TIME, index=0, onTimeIndex=0} >> [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, >> timing=EARLY, index=0} >> >> FlinkRunner: >> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >> >> SparkRunner: >> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >> >> It seems DirectRunner propagates the windowing information as expected. >> I am not sure if TextIO really propagates or it just emits a window pane, >> because the timing before TextIO is ON_TIME and after TextIO is EARLY. >> In any case using FlinkRunner and SparkRunner the timing and the pane are >> not set. >> >> I thought the problem was in GatherBundlesPerWindowFn, but now, after >> seeing that the DirectRunner filled windowing data... I am not sure. >> >> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 >> >> >> Appendix >> ----------- >> Here you can see the log function and traces for different runners in >> detail. >> >> private SingleOutput<String, String> logBefore() { >> return ParDo.of(new DoFn<String, String>() { >> @ProcessElement >> public void processElement(ProcessContext context, BoundedWindow >> boundedWindow) { >> String value = context.element(); >> log.info("[Before Write] Element=data window={}, >> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", >> boundedWindow, >> context.timestamp(), >> context.pane().getTiming(), >> context.pane().getIndex(), >> context.pane().isFirst(), >> context.pane().isLast(), >> context.pane() >> ); >> context.output(context.element()); >> } >> }); >> } >> >> logAfter function shows the same information. >> >> Traces in details. >> >> DirectRunner (local): >> [Before Write] Element=data >> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst >> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >> timing=ON_TIME, index=0, onTimeIndex=0} >> [After Write] Element=file >> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst >> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} >> >> >> FlinkRunner (cluster): >> [Before Write] Element=data >> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst >> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >> timing=ON_TIME, index=0, onTimeIndex=0} >> [After Write] Element=file >> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst >> =true, isLast=true pane=PaneInfo.NO_FIRING >> >> SparkRunner (cluster): >> [Before Write] Element=data >> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst >> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >> timing=ON_TIME, index=0, onTimeIndex=0} >> [After Write] Element=file >> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst >> =true, isLast=true pane=PaneInfo.NO_FIRING >> >> >> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>) >> escribió: >> >>> The window information should still be there. Beam propagates windows >>> through PCollection, and I don't think WriteFiles does anything explicit to >>> stop that. >>> >>> Can you try this with the direct runner to see what happens there? >>> What is your windowing on this PCollection? >>> >>> Reuven >>> >>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com> >>> wrote: >>> >>>> I got the same behavior using Spark Runner (with Spark 2.4.3), window >>>> information was missing. >>>> >>>> Just to clarify, the combiner after TextIO had different results. In >>>> Flink runner the files names were dropped, and in Spark the combination >>>> process happened twice, duplicating data. I think it is because different >>>> runners manage in a different way the data without the windowing >>>> information. >>>> >>>> >>>> >>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>) >>>> escribió: >>>> >>>>> +dev <dev@beam.apache.org> >>>>> >>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi guys, >>>>>> >>>>>> I think I have found something interesting about windowing. >>>>>> >>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by >>>>>> means of TextIO. >>>>>> Once written, generated files are combined to apply some custom >>>>>> operations. >>>>>> However, the combine does not receive data. Following, you can find >>>>>> the >>>>>> highlight of my pipeline. >>>>>> >>>>>> //STREAM + processing time. >>>>>> pipeline.apply(KafkaIO.read()) >>>>>> .apply(...) //mappers, a window and a combine >>>>>> >>>>>> .apply("WriteFiles", >>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>> .getPerDestinationOutputFilenames() >>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>> >>>>>> Running the pipeline with Flink I have found a log trace that says >>>>>> data are >>>>>> discarded as late in the combine CombineFileNames. Then, I have added >>>>>> AllowedLateness to pipeline window strategy, as a workaround. >>>>>> It works by now, but this opens several questions to me >>>>>> >>>>>> I think the problem is getPerDestinationOutputFilenames generates >>>>>> files, but >>>>>> it does not maintain the windowing information. Then, >>>>>> CombineFileNames compares >>>>>> file names with the watermark of the pipeline and discards them as >>>>>> late. >>>>>> >>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I >>>>>> am doing something wrong >>>>>> and using getPerDestinationOutputFilenames + combine does not make >>>>>> sense. >>>>>> What do you think? >>>>>> >>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>>>> >>>>>> Many thanks, >>>>>> Jose >>>>>> >>>>>>