Hi, I would like to clarify that while TextIO is writing every data are in the files (shards). The losing happens when file names emitted by getPerDestinationOutputFilenames are processed by a window.
I have created a pipeline to reproduce the scenario in which some filenames are loss after the getPerDestinationOutputFilenames. Please, note I tried to simplify the code as much as possible, but the scenario is not easy to reproduce. Please check this project https://github.com/kiuby88/windowing-textio Check readme to build and run ( https://github.com/kiuby88/windowing-textio#build-and-run) Project contains only a class with the pipeline PipelineWithTextIo, a log4j2.xml file in the resources and the pom. The pipeline in PipelineWithTextIo generates unbounded data using a sequence. It adds a little delay (10s) per data entry, it uses a distinct (just to apply the window), and then it writes data using TexIO. The windows for the distinct is fixed (5 seconds) and it does not use lateness. Generated files can be found in windowing-textio/pipe_with_lateness_0s/files. To write files the FileNamePolicy uses window + timing + shards (see https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135 ) Files are emitted using getPerDestinationOutputFilenames() (see the code here, https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78 ) Then, File names in the PCollection are extracted and logged. Please, note file names dot not have pain information in that point. To apply a window a distinct is used again. Here several files are discarded as late and they are not processed by this second distinct. Please, see https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 Debug is enabled for WindowTracing, so you can find in the terminal several messages as the followiing: DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping element at 2020-05-12T14:05:14.999Z for key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far behind inputWatermark:2020-05-12T14:05:19.799Z; outputWatermark:2020-05-12T14:05:19.799Z` What happen here? I think that messages are generated per second and a window of 5 seconds group them. Then a delay is added and finally data are written in a file. The pipeline reads more data, increasing the watermark. Then, file names are emitted without pane information (see "Emitted File" in logs). Window in second distinct compares file names' timestamp and the pipeline watermark and then it discards file names as late. Bonus ----- You can add a lateness to the pipeline. See https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness If a minute is added a lateness for window the file names are processed as late. As result the traces of LateDataFilter disappear. Moreover, in order to illustrate better that file names are emitted as late for the second discarded I added a second TextIO to write file names in other files. Same FileNamePolicy than before was used (window + timing + shards). Then, you can find files that contains the original filenames in windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the interesting part, because you will find several files with LATE in their names. Please, let me know if you need more information or if the example is not enough to check the expected scenarios. Kby. El dom., 10 may. 2020 a las 17:04, Reuven Lax (<[email protected]>) escribió: > Pane info is supposed to be preserved across transforms. If the Fink > runner does not, than I believe that is a bug. > > On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <[email protected]> > wrote: > >> I am using FileIO and I do observe the drop of pane info information on >> Flink runner too. It was mentioned in this thread: >> https://www.mail-archive.com/[email protected]/msg20186.html >> >> It is a result of different reshuffle expansion for optimisation reasons. >> However, I did not observe a data loss in my case. Windowing and watermark >> info should be preserved. Pane info is not, which brings a question how >> reliable pane info should be in terms of SDK and runner. >> >> If you do observe a data loss, it would be great to share a test case >> which replicates the problem. >> >> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <[email protected]> wrote: >> >>> Ah, I think I see the problem. >>> >>> It appears that for some reason, the Flink runner loses windowing >>> information when a Reshuffle is applied. I'm not entirely sure why, because >>> windowing information should be maintained across a Reshuffle. >>> >>> Reuven >>> >>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <[email protected]> >>> wrote: >>> >>>> >>>> Hi, >>>> >>>> I have added some logs to the pipeline as following (you can find the >>>> log function in the Appendix): >>>> >>>> //STREAM + processing time. >>>> pipeline.apply(KafkaIO.read()) >>>> .apply(...) //mappers, a window and a combine >>>> .apply(logBeforeWrite()) >>>> >>>> .apply("WriteFiles", >>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>> .getPerDestinationOutputFilenames() >>>> >>>> .apply(logAfterWrite()) >>>> .apply("CombineFileNames", Combine.perKey(...)) >>>> >>>> I have run the pipeline using DirectRunner (local), SparkRunner and >>>> FlinkRunner, both of them using a cluster. >>>> Below you can see the timing and pane information before/after (you can >>>> see traces in detail with window and timestamp information in the >>>> Appendix). >>>> >>>> DirectRunner: >>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>> [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, >>>> timing=EARLY, index=0} >>>> >>>> FlinkRunner: >>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>> >>>> SparkRunner: >>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>> >>>> It seems DirectRunner propagates the windowing information as expected. >>>> I am not sure if TextIO really propagates or it just emits a window >>>> pane, because the timing before TextIO is ON_TIME and after TextIO is >>>> EARLY. >>>> In any case using FlinkRunner and SparkRunner the timing and the pane >>>> are not set. >>>> >>>> I thought the problem was in GatherBundlesPerWindowFn, but now, after >>>> seeing that the DirectRunner filled windowing data... I am not sure. >>>> >>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 >>>> >>>> >>>> Appendix >>>> ----------- >>>> Here you can see the log function and traces for different runners in >>>> detail. >>>> >>>> private SingleOutput<String, String> logBefore() { >>>> return ParDo.of(new DoFn<String, String>() { >>>> @ProcessElement >>>> public void processElement(ProcessContext context, >>>> BoundedWindow boundedWindow) { >>>> String value = context.element(); >>>> log.info("[Before Write] Element=data window={}, >>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", >>>> boundedWindow, >>>> context.timestamp(), >>>> context.pane().getTiming(), >>>> context.pane().getIndex(), >>>> context.pane().isFirst(), >>>> context.pane().isLast(), >>>> context.pane() >>>> ); >>>> context.output(context.element()); >>>> } >>>> }); >>>> } >>>> >>>> logAfter function shows the same information. >>>> >>>> Traces in details. >>>> >>>> DirectRunner (local): >>>> [Before Write] Element=data >>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst >>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>> [After Write] Element=file >>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst >>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} >>>> >>>> >>>> FlinkRunner (cluster): >>>> [Before Write] Element=data >>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst >>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>> [After Write] Element=file >>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst >>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>> >>>> SparkRunner (cluster): >>>> [Before Write] Element=data >>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst >>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>> [After Write] Element=file >>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst >>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>> >>>> >>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<[email protected]>) >>>> escribió: >>>> >>>>> The window information should still be there. Beam propagates windows >>>>> through PCollection, and I don't think WriteFiles does anything explicit >>>>> to >>>>> stop that. >>>>> >>>>> Can you try this with the direct runner to see what happens there? >>>>> What is your windowing on this PCollection? >>>>> >>>>> Reuven >>>>> >>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <[email protected]> >>>>> wrote: >>>>> >>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), window >>>>>> information was missing. >>>>>> >>>>>> Just to clarify, the combiner after TextIO had different results. In >>>>>> Flink runner the files names were dropped, and in Spark the combination >>>>>> process happened twice, duplicating data. I think it is because >>>>>> different >>>>>> runners manage in a different way the data without the windowing >>>>>> information. >>>>>> >>>>>> >>>>>> >>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<[email protected]>) >>>>>> escribió: >>>>>> >>>>>>> +dev <[email protected]> >>>>>>> >>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi guys, >>>>>>>> >>>>>>>> I think I have found something interesting about windowing. >>>>>>>> >>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by >>>>>>>> means of TextIO. >>>>>>>> Once written, generated files are combined to apply some custom >>>>>>>> operations. >>>>>>>> However, the combine does not receive data. Following, you can find >>>>>>>> the >>>>>>>> highlight of my pipeline. >>>>>>>> >>>>>>>> //STREAM + processing time. >>>>>>>> pipeline.apply(KafkaIO.read()) >>>>>>>> .apply(...) //mappers, a window and a combine >>>>>>>> >>>>>>>> .apply("WriteFiles", >>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>>>> .getPerDestinationOutputFilenames() >>>>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>>>> >>>>>>>> Running the pipeline with Flink I have found a log trace that says >>>>>>>> data are >>>>>>>> discarded as late in the combine CombineFileNames. Then, I have >>>>>>>> added >>>>>>>> AllowedLateness to pipeline window strategy, as a workaround. >>>>>>>> It works by now, but this opens several questions to me >>>>>>>> >>>>>>>> I think the problem is getPerDestinationOutputFilenames generates >>>>>>>> files, but >>>>>>>> it does not maintain the windowing information. Then, >>>>>>>> CombineFileNames compares >>>>>>>> file names with the watermark of the pipeline and discards them as >>>>>>>> late. >>>>>>>> >>>>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I >>>>>>>> am doing something wrong >>>>>>>> and using getPerDestinationOutputFilenames + combine does not make >>>>>>>> sense. >>>>>>>> What do you think? >>>>>>>> >>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>>>>>> >>>>>>>> Many thanks, >>>>>>>> Jose >>>>>>>> >>>>>>>>
