Hi, Many thanks for your responses.
I agree with you, Reuven, source is who should determine if data are late or not. Maximilian, I agree with you, as I mentioned in previous emails I saw the same behavior with Spark, and I guessed the problem was here https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 I tried the same solution you said on my local, maintaining the pane info in to emit them as part of the finish bundle, but I had some problems with gradle. Happy to know where is the problem. Thanks, Jose El vie., 15 may. 2020 a las 18:34, Reuven Lax (<[email protected]>) escribió: > Lateness should never be introduced inside a pipeline - generally late > data can only come from a source. If data was not dropped as late earlier > in the pipeline, it should not be dropped after the file write. I suspect > that this is a bug in how the Flink runner handles the Reshuffle transform, > but I'm not sure what the exact bug is. > > Reuven > > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <[email protected]> > wrote: > >> Hi Jose, >> >> thank you for putting the effort to get example which demonstrate your >> problem. >> >> You are using a streaming pipeline and it seems that watermark in >> downstream already advanced further, so when your File pane arrives, it is >> already late. Since you define that lateness is not tolerated, it is >> dropped. >> I myself never had requirement to specify zero allowed lateness for >> streaming. It feels dangerous. Do you have a specific use case? Also, in >> may cases, after windowed files are written, I usually collect them into >> global window and specify a different triggering policy for collecting >> them. Both cases are why I never came across this situation. >> >> I do not have an explanation if it is a bug or not. I would guess that >> watermark can advance further, e.g. because elements can be processed in >> arbitrary order. Not saying this is the case. >> It needs someone with better understanding of how watermark advance is / >> should be handled within pipelines. >> >> >> P.S.: you can add `.withTimestampFn()` to your generate sequence, to get >> more stable timing, which is also easier to reason about: >> >> Dropping element at 1970-01-01T00:00:19.999Z for key >> ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) since too >> far behind inputWatermark:1970-01-01T00:00:24.000Z; >> outputWatermark:1970-01-01T00:00:24 >> .000Z >> >> instead of >> >> Dropping element at 2020-05-15T08:52:34.999Z for key ... >> window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since too far >> behind inputWatermark:2020-05-15T08:52:39.318Z; >> outputWatermark:2020-05-15T08:52:39.318Z >> >> >> >> >> In my >> >> >> >> On Thu, May 14, 2020 at 10:47 AM Jose Manuel <[email protected]> >> wrote: >> >>> Hi again, >>> >>> I have simplify the example to reproduce the data loss. The scenario is >>> the following: >>> >>> - TextIO write files. >>> - getPerDestinationOutputFilenames emits file names >>> - File names are processed by a aggregator (combine, distinct, >>> groupbyKey...) with a window **without allowlateness** >>> - File names are discarded as late >>> >>> Here you can see the data loss in the picture in >>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss >>> >>> Please, follow README to run the pipeline and find log traces that say >>> data are dropped as late. >>> Remember, you can run the pipeline with another window's lateness >>> values (check README.md) >>> >>> Kby. >>> >>> El mar., 12 may. 2020 a las 17:16, Jose Manuel (<[email protected]>) >>> escribió: >>> >>>> Hi, >>>> >>>> I would like to clarify that while TextIO is writing every data are in >>>> the files (shards). The losing happens when file names emitted by >>>> getPerDestinationOutputFilenames are processed by a window. >>>> >>>> I have created a pipeline to reproduce the scenario in which some >>>> filenames are loss after the getPerDestinationOutputFilenames. Please, note >>>> I tried to simplify the code as much as possible, but the scenario is not >>>> easy to reproduce. >>>> >>>> Please check this project https://github.com/kiuby88/windowing-textio >>>> Check readme to build and run ( >>>> https://github.com/kiuby88/windowing-textio#build-and-run) >>>> Project contains only a class with the pipeline PipelineWithTextIo, >>>> a log4j2.xml file in the resources and the pom. >>>> >>>> The pipeline in PipelineWithTextIo generates unbounded data using a >>>> sequence. It adds a little delay (10s) per data entry, it uses a distinct >>>> (just to apply the window), and then it writes data using TexIO. >>>> The windows for the distinct is fixed (5 seconds) and it does not use >>>> lateness. >>>> Generated files can be found in >>>> windowing-textio/pipe_with_lateness_0s/files. To write files the >>>> FileNamePolicy uses window + timing + shards (see >>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135 >>>> ) >>>> Files are emitted using getPerDestinationOutputFilenames() >>>> (see the code here, >>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78 >>>> ) >>>> >>>> Then, File names in the PCollection are extracted and logged. Please, >>>> note file names dot not have pain information in that point. >>>> >>>> To apply a window a distinct is used again. Here several files are >>>> discarded as late and they are not processed by this second distinct. >>>> Please, see >>>> >>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 >>>> >>>> Debug is enabled for WindowTracing, so you can find in the terminal >>>> several messages as the followiing: >>>> DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping >>>> element at 2020-05-12T14:05:14.999Z for >>>> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; >>>> window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far >>>> behind inputWatermark:2020-05-12T14:05:19.799Z; >>>> outputWatermark:2020-05-12T14:05:19.799Z` >>>> >>>> What happen here? I think that messages are generated per second and a >>>> window of 5 seconds group them. Then a delay is added and finally data are >>>> written in a file. >>>> The pipeline reads more data, increasing the watermark. >>>> Then, file names are emitted without pane information (see "Emitted >>>> File" in logs). Window in second distinct compares file names' timestamp >>>> and the pipeline watermark and then it discards file names as late. >>>> >>>> >>>> Bonus >>>> ----- >>>> You can add a lateness to the pipeline. See >>>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness >>>> >>>> If a minute is added a lateness for window the file names are processed >>>> as late. As result the traces of LateDataFilter disappear. >>>> >>>> Moreover, in order to illustrate better that file names are emitted as >>>> late for the second discarded I added a second TextIO to write file names >>>> in other files. >>>> Same FileNamePolicy than before was used (window + timing + shards). >>>> Then, you can find files that contains the original filenames in >>>> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the >>>> interesting part, because you will find several files with LATE in their >>>> names. >>>> >>>> Please, let me know if you need more information or if the example is >>>> not enough to check the expected scenarios. >>>> >>>> Kby. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> El dom., 10 may. 2020 a las 17:04, Reuven Lax (<[email protected]>) >>>> escribió: >>>> >>>>> Pane info is supposed to be preserved across transforms. If the Fink >>>>> runner does not, than I believe that is a bug. >>>>> >>>>> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <[email protected]> >>>>> wrote: >>>>> >>>>>> I am using FileIO and I do observe the drop of pane info information >>>>>> on Flink runner too. It was mentioned in this thread: >>>>>> https://www.mail-archive.com/[email protected]/msg20186.html >>>>>> >>>>>> It is a result of different reshuffle expansion for optimisation >>>>>> reasons. However, I did not observe a data loss in my case. Windowing and >>>>>> watermark info should be preserved. Pane info is not, which brings a >>>>>> question how reliable pane info should be in terms of SDK and runner. >>>>>> >>>>>> If you do observe a data loss, it would be great to share a test case >>>>>> which replicates the problem. >>>>>> >>>>>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> Ah, I think I see the problem. >>>>>>> >>>>>>> It appears that for some reason, the Flink runner loses windowing >>>>>>> information when a Reshuffle is applied. I'm not entirely sure why, >>>>>>> because >>>>>>> windowing information should be maintained across a Reshuffle. >>>>>>> >>>>>>> Reuven >>>>>>> >>>>>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I have added some logs to the pipeline as following (you can find >>>>>>>> the log function in the Appendix): >>>>>>>> >>>>>>>> //STREAM + processing time. >>>>>>>> pipeline.apply(KafkaIO.read()) >>>>>>>> .apply(...) //mappers, a window and a combine >>>>>>>> .apply(logBeforeWrite()) >>>>>>>> >>>>>>>> .apply("WriteFiles", >>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>>>> .getPerDestinationOutputFilenames() >>>>>>>> >>>>>>>> .apply(logAfterWrite()) >>>>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>>>> >>>>>>>> I have run the pipeline using DirectRunner (local), SparkRunner and >>>>>>>> FlinkRunner, both of them using a cluster. >>>>>>>> Below you can see the timing and pane information before/after (you >>>>>>>> can see traces in detail with window and timestamp information in the >>>>>>>> Appendix). >>>>>>>> >>>>>>>> DirectRunner: >>>>>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>>> [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, >>>>>>>> timing=EARLY, index=0} >>>>>>>> >>>>>>>> FlinkRunner: >>>>>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>>>>>> >>>>>>>> SparkRunner: >>>>>>>> [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, >>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>>> [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING >>>>>>>> >>>>>>>> It seems DirectRunner propagates the windowing information as >>>>>>>> expected. >>>>>>>> I am not sure if TextIO really propagates or it just emits a window >>>>>>>> pane, because the timing before TextIO is ON_TIME and after TextIO is >>>>>>>> EARLY. >>>>>>>> In any case using FlinkRunner and SparkRunner the timing and the >>>>>>>> pane are not set. >>>>>>>> >>>>>>>> I thought the problem was in GatherBundlesPerWindowFn, but now, >>>>>>>> after seeing that the DirectRunner filled windowing data... I am not >>>>>>>> sure. >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 >>>>>>>> >>>>>>>> >>>>>>>> Appendix >>>>>>>> ----------- >>>>>>>> Here you can see the log function and traces for different runners >>>>>>>> in detail. >>>>>>>> >>>>>>>> private SingleOutput<String, String> logBefore() { >>>>>>>> return ParDo.of(new DoFn<String, String>() { >>>>>>>> @ProcessElement >>>>>>>> public void processElement(ProcessContext context, >>>>>>>> BoundedWindow boundedWindow) { >>>>>>>> String value = context.element(); >>>>>>>> log.info("[Before Write] Element=data window={}, >>>>>>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", >>>>>>>> boundedWindow, >>>>>>>> context.timestamp(), >>>>>>>> context.pane().getTiming(), >>>>>>>> context.pane().getIndex(), >>>>>>>> context.pane().isFirst(), >>>>>>>> context.pane().isLast(), >>>>>>>> context.pane() >>>>>>>> ); >>>>>>>> context.output(context.element()); >>>>>>>> } >>>>>>>> }); >>>>>>>> } >>>>>>>> >>>>>>>> logAfter function shows the same information. >>>>>>>> >>>>>>>> Traces in details. >>>>>>>> >>>>>>>> DirectRunner (local): >>>>>>>> [Before Write] Element=data >>>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst >>>>>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>>> [After Write] Element=file >>>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >>>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst >>>>>>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} >>>>>>>> >>>>>>>> >>>>>>>> FlinkRunner (cluster): >>>>>>>> [Before Write] Element=data >>>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst >>>>>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>>> [After Write] Element=file >>>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >>>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst >>>>>>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>>>>>> >>>>>>>> SparkRunner (cluster): >>>>>>>> [Before Write] Element=data >>>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst >>>>>>>> =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, >>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0} >>>>>>>> [After Write] Element=file >>>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >>>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst >>>>>>>> =true, isLast=true pane=PaneInfo.NO_FIRING >>>>>>>> >>>>>>>> >>>>>>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<[email protected]>) >>>>>>>> escribió: >>>>>>>> >>>>>>>>> The window information should still be there. Beam propagates >>>>>>>>> windows through PCollection, and I don't think WriteFiles does >>>>>>>>> anything >>>>>>>>> explicit to stop that. >>>>>>>>> >>>>>>>>> Can you try this with the direct runner to see what happens there? >>>>>>>>> What is your windowing on this PCollection? >>>>>>>>> >>>>>>>>> Reuven >>>>>>>>> >>>>>>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), >>>>>>>>>> window information was missing. >>>>>>>>>> >>>>>>>>>> Just to clarify, the combiner after TextIO had different results. >>>>>>>>>> In Flink runner the files names were dropped, and in Spark the >>>>>>>>>> combination >>>>>>>>>> process happened twice, duplicating data. I think it is because >>>>>>>>>> different >>>>>>>>>> runners manage in a different way the data without the windowing >>>>>>>>>> information. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<[email protected]>) >>>>>>>>>> escribió: >>>>>>>>>> >>>>>>>>>>> +dev <[email protected]> >>>>>>>>>>> >>>>>>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi guys, >>>>>>>>>>>> >>>>>>>>>>>> I think I have found something interesting about windowing. >>>>>>>>>>>> >>>>>>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS >>>>>>>>>>>> by means of TextIO. >>>>>>>>>>>> Once written, generated files are combined to apply some custom >>>>>>>>>>>> operations. >>>>>>>>>>>> However, the combine does not receive data. Following, you can >>>>>>>>>>>> find the >>>>>>>>>>>> highlight of my pipeline. >>>>>>>>>>>> >>>>>>>>>>>> //STREAM + processing time. >>>>>>>>>>>> pipeline.apply(KafkaIO.read()) >>>>>>>>>>>> .apply(...) //mappers, a window and a combine >>>>>>>>>>>> >>>>>>>>>>>> .apply("WriteFiles", >>>>>>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>>>>>>>>> .getPerDestinationOutputFilenames() >>>>>>>>>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>>>>>>>>> >>>>>>>>>>>> Running the pipeline with Flink I have found a log trace that >>>>>>>>>>>> says data are >>>>>>>>>>>> discarded as late in the combine CombineFileNames. Then, I >>>>>>>>>>>> have added >>>>>>>>>>>> AllowedLateness to pipeline window strategy, as a workaround. >>>>>>>>>>>> It works by now, but this opens several questions to me >>>>>>>>>>>> >>>>>>>>>>>> I think the problem is getPerDestinationOutputFilenames >>>>>>>>>>>> generates files, but >>>>>>>>>>>> it does not maintain the windowing information. Then, >>>>>>>>>>>> CombineFileNames compares >>>>>>>>>>>> file names with the watermark of the pipeline and discards them >>>>>>>>>>>> as late. >>>>>>>>>>>> >>>>>>>>>>>> Is there any issue with getPerDestinationOutputFilenames? >>>>>>>>>>>> Maybe, I am doing something wrong >>>>>>>>>>>> and using getPerDestinationOutputFilenames + combine does not >>>>>>>>>>>> make sense. >>>>>>>>>>>> What do you think? >>>>>>>>>>>> >>>>>>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>>>>>>>>>> >>>>>>>>>>>> Many thanks, >>>>>>>>>>>> Jose >>>>>>>>>>>> >>>>>>>>>>>>
