Hi Reuven, I can try to explaining what I guess.
- There is a source which is reading data entries and updating the watermark. - Then, data entries are grouped and stored in files. - The window information of these data entries are used to emit filenames. Data entries's window and timestamp. PaneInfo is empty. - When a second window is applied to filenames, if allowlateness is zero of lower than the spent time in the previous reading/writing, the filenames are discarded as late. I guess, the key is in https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L168 My assumption is global watermark (or source watermark, I am not sure about the name) is used to evaluate the filenames, what are in an already emitted window. Thanks Jose El lun., 18 may. 2020 a las 18:37, Reuven Lax (<[email protected]>) escribió: > This is still confusing to me - why would the messages be dropped as late > in this case? > > On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <[email protected]> wrote: > >> All runners which use the Beam reference implementation drop the >> PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's >> why we can observe this behavior not only in Flink but also Spark. >> >> The WriteFilesResult is returned here: >> >> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363 >> >> GatherBundlesPerWindow will discard the pane information because all >> buffered elements are emitted in the FinishBundle method which always >> has a NO_FIRING (unknown) pane info: >> >> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895 >> >> So this seems expected behavior. We would need to preserve the panes in >> the Multimap buffer. >> >> -Max >> >> On 15.05.20 18:34, Reuven Lax wrote: >> > Lateness should never be introduced inside a pipeline - generally late >> > data can only come from a source. If data was not dropped as late >> > earlier in the pipeline, it should not be dropped after the file write. >> > I suspect that this is a bug in how the Flink runner handles the >> > Reshuffle transform, but I'm not sure what the exact bug is. >> > >> > Reuven >> > >> > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi Jose, >> > >> > thank you for putting the effort to get example which >> > demonstrate your problem. >> > >> > You are using a streaming pipeline and it seems that watermark in >> > downstream already advanced further, so when your File pane arrives, >> > it is already late. Since you define that lateness is not tolerated, >> > it is dropped. >> > I myself never had requirement to specify zero allowed lateness for >> > streaming. It feels dangerous. Do you have a specific use case? >> > Also, in may cases, after windowed files are written, I usually >> > collect them into global window and specify a different triggering >> > policy for collecting them. Both cases are why I never came across >> > this situation. >> > >> > I do not have an explanation if it is a bug or not. I would guess >> > that watermark can advance further, e.g. because elements can be >> > processed in arbitrary order. Not saying this is the case. >> > It needs someone with better understanding of how watermark advance >> > is / should be handled within pipelines. >> > >> > >> > P.S.: you can add `.withTimestampFn()` to your generate sequence, to >> > get more stable timing, which is also easier to reason about: >> > >> > Dropping element at 1970-01-01T00:00:19.999Z for key >> > ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) >> > since too far behind inputWatermark:1970-01-01T00:00:24.000Z; >> > outputWatermark:1970-01-01T00:00:24 >> > .000Z >> > >> > instead of >> > >> > Dropping element at 2020-05-15T08:52:34.999Z for key ... >> > window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since >> > too far behind inputWatermark:2020-05-15T08:52:39.318Z; >> > outputWatermark:2020-05-15T08:52:39.318Z >> > >> > >> > >> > >> > In my >> > >> > >> > >> > On Thu, May 14, 2020 at 10:47 AM Jose Manuel <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Hi again, >> > >> > I have simplify the example to reproduce the data loss. The >> > scenario is the following: >> > >> > - TextIO write files. >> > - getPerDestinationOutputFilenames emits file names >> > - File names are processed by a aggregator (combine, distinct, >> > groupbyKey...) with a window **without allowlateness** >> > - File names are discarded as late >> > >> > Here you can see the data loss in the picture >> > in >> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss >> > >> > Please, follow README to run the pipeline and find log traces >> > that say data are dropped as late. >> > Remember, you can run the pipeline with another >> > window's lateness values (check README.md) >> > >> > Kby. >> > >> > El mar., 12 may. 2020 a las 17:16, Jose Manuel >> > (<[email protected] <mailto:[email protected]>>) >> escribió: >> > >> > Hi, >> > >> > I would like to clarify that while TextIO is writing every >> > data are in the files (shards). The losing happens when file >> > names emitted by getPerDestinationOutputFilenames are >> > processed by a window. >> > >> > I have created a pipeline to reproduce the scenario in which >> > some filenames are loss after the >> > getPerDestinationOutputFilenames. Please, note I tried to >> > simplify the code as much as possible, but the scenario is >> > not easy to reproduce. >> > >> > Please check this project >> > https://github.com/kiuby88/windowing-textio >> > Check readme to build and run >> > (https://github.com/kiuby88/windowing-textio#build-and-run) >> > Project contains only a class with the >> > pipeline PipelineWithTextIo, a log4j2.xml file in the >> > resources and the pom. >> > >> > The pipeline in PipelineWithTextIo generates unbounded data >> > using a sequence. It adds a little delay (10s) per data >> > entry, it uses a distinct (just to apply the window), and >> > then it writes data using TexIO. >> > The windows for the distinct is fixed (5 seconds) and it >> > does not use lateness. >> > Generated files can be found in >> > windowing-textio/pipe_with_lateness_0s/files. To write files >> > the FileNamePolicy uses window + timing + shards >> > (see >> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135 >> ) >> > Files are emitted using getPerDestinationOutputFilenames() >> > (see the code here, >> > >> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78 >> ) >> > >> > Then, File names in the PCollection are extracted and >> > logged. Please, note file names dot not have pain >> > information in that point. >> > >> > To apply a window a distinct is used again. Here several >> > files are discarded as late and they are not processed by >> > this second distinct. Please, see >> > >> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83 >> > >> > Debug is enabled for WindowTracing, so you can find in the >> > terminal several messages as the followiing: >> > DEBUG org.apache.beam.sdk.util.WindowTracing - >> > LateDataFilter: Dropping element at 2020-05-12T14:05:14.999Z >> > for >> > >> >> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt; >> > window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) >> > since too far behind >> > inputWatermark:2020-05-12T14:05:19.799Z; >> > outputWatermark:2020-05-12T14:05:19.799Z` >> > >> > What happen here? I think that messages are generated per >> > second and a window of 5 seconds group them. Then a delay is >> > added and finally data are written in a file. >> > The pipeline reads more data, increasing the watermark. >> > Then, file names are emitted without pane information (see >> > "Emitted File" in logs). Window in second distinct compares >> > file names' timestamp and the pipeline watermark and then it >> > discards file names as late. >> > >> > >> > Bonus >> > ----- >> > You can add a lateness to the pipeline. See >> > >> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness >> > >> > If a minute is added a lateness for window the file names >> > are processed as late. As result the traces of >> > LateDataFilter disappear. >> > >> > Moreover, in order to illustrate better that file names are >> > emitted as late for the second discarded I added a second >> > TextIO to write file names in other files. >> > Same FileNamePolicy than before was used (window + timing + >> > shards). Then, you can find files that contains the original >> > filenames in >> > >> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This >> > is the interesting part, because you will find several files >> > with LATE in their names. >> > >> > Please, let me know if you need more information or if the >> > example is not enough to check the expected scenarios. >> > >> > Kby. >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > El dom., 10 may. 2020 a las 17:04, Reuven Lax >> > (<[email protected] <mailto:[email protected]>>) escribió: >> > >> > Pane info is supposed to be preserved across transforms. >> > If the Fink runner does not, than I believe that is a >> bug. >> > >> > On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek >> > <[email protected] <mailto:[email protected]>> >> > wrote: >> > >> > I am using FileIO and I do observe the drop of pane >> > info information on Flink runner too. It was >> > mentioned in this thread: >> > >> https://www.mail-archive.com/[email protected]/msg20186.html >> > >> > It is a result of different reshuffle expansion for >> > optimisation reasons. However, I did not observe a >> > data loss in my case. Windowing and watermark info >> > should be preserved. Pane info is not, which brings >> > a question how reliable pane info should be in terms >> > of SDK and runner. >> > >> > If you do observe a data loss, it would be great to >> > share a test case which replicates the problem. >> > >> > On Sun, May 10, 2020 at 8:03 AM Reuven Lax >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> > Ah, I think I see the problem. >> > >> > It appears that for some reason, the Flink >> > runner loses windowing information when a >> > Reshuffle is applied. I'm not entirely sure why, >> > because windowing information should be >> > maintained across a Reshuffle. >> > >> > Reuven >> > >> > On Sat, May 9, 2020 at 9:50 AM Jose Manuel >> > <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > >> > Hi, >> > >> > I have added some logs to the pipeline as >> > following (you can find the log function in >> > the Appendix): >> > >> > //STREAM + processing time. >> > pipeline.apply(KafkaIO.read()) >> > .apply(...) //mappers, a window and a >> > combine >> > .apply(logBeforeWrite()) >> > >> > .apply("WriteFiles", >> > >> >> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >> > .getPerDestinationOutputFilenames() >> > >> > .apply(logAfterWrite()) >> > .apply("CombineFileNames", >> > Combine.perKey(...)) >> > >> > I have run the pipeline using DirectRunner >> > (local), SparkRunner and FlinkRunner, both >> > of them using a cluster. >> > Below you can see the timing and pane >> > information before/after (you can see traces >> > in detail with window and timestamp >> > information in the Appendix). >> > >> > DirectRunner: >> > [Before Write] timing=ON_TIME, >> > pane=PaneInfo{isFirst=true, isLast=true, >> > timing=ON_TIME, index=0, onTimeIndex=0} >> > [After Write] timing=EARLY, >> > pane=PaneInfo{isFirst=true, timing=EARLY, >> > index=0} >> > >> > FlinkRunner: >> > [Before Write] timing=ON_TIME, >> > pane=PaneInfo{isFirst=true, isLast=true, >> > timing=ON_TIME, index=0, onTimeIndex=0} >> > [After Write] timing=UNKNOWN, >> > pane=PaneInfo.NO_FIRING >> > >> > SparkRunner: >> > [Before Write] timing=ON_TIME, >> > pane=PaneInfo{isFirst=true, isLast=true, >> > timing=ON_TIME, index=0, onTimeIndex=0} >> > [After Write] timing=UNKNOWN, >> > pane=PaneInfo.NO_FIRING >> > >> > It seems DirectRunner propagates the >> > windowing information as expected. >> > I am not sure if TextIO really propagates or >> > it just emits a window pane, because the >> > timing before TextIO is ON_TIME and after >> > TextIO is EARLY. >> > In any case using FlinkRunner and >> > SparkRunner the timing and the pane are not >> set. >> > >> > I thought the problem was in >> > GatherBundlesPerWindowFn, but now, after >> > seeing that the DirectRunner filled >> > windowing data... I am not sure. >> > >> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 >> > >> > >> > Appendix >> > ----------- >> > Here you can see the log function and traces >> > for different runners in detail. >> > >> > private SingleOutput<String, String> >> > logBefore() { >> > return ParDo.of(new DoFn<String, >> String>() { >> > @ProcessElement >> > public void >> > processElement(ProcessContext context, >> > BoundedWindow boundedWindow) { >> > String value = >> context.element(); >> > log.info >> > <http://log.info>("[Before Write] >> > Element=data window={}, timestamp={}, >> > timing={}, index ={}, isFirst ={}, >> > isLast={}, pane={}", >> > boundedWindow, >> > context.timestamp(), >> > >> context.pane().getTiming(), >> > >> context.pane().getIndex(), >> > >> context.pane().isFirst(), >> > context.pane().isLast(), >> > context.pane() >> > ); >> > >> context.output(context.element()); >> > } >> > }); >> > } >> > >> > logAfter function shows the same >> information. >> > >> > Traces in details. >> > >> > DirectRunner (local): >> > [Before Write] Element=data >> > >> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >> > timestamp=2020-05-09T13:39:59.999Z, >> > timing=ON_TIME, index =0, isFirst =true, >> > isLast=true pane=PaneInfo{isFirst=true, >> > isLast=true, timing=ON_TIME, index=0, >> > onTimeIndex=0} >> > [After Write] Element=file >> > >> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), >> > timestamp=2020-05-09T13:39:59.999Z, >> > timing=EARLY, index =0, isFirst =true, >> > isLast=false pane=PaneInfo{isFirst=true, >> > timing=EARLY, index=0} >> > >> > >> > FlinkRunner (cluster): >> > [Before Write] Element=data >> > >> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >> > timestamp=2020-05-09T15:13:59.999Z, >> > timing=ON_TIME, index =0, isFirst =true, >> > isLast=true pane=PaneInfo{isFirst=true, >> > isLast=true, timing=ON_TIME, index=0, >> > onTimeIndex=0} >> > [After Write] Element=file >> > >> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), >> > timestamp=2020-05-09T15:13:59.999Z, >> > timing=UNKNOWN, index =0, isFirst =true, >> > isLast=true pane=PaneInfo.NO_FIRING >> > >> > SparkRunner (cluster): >> > [Before Write] Element=data >> > >> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >> > timestamp=2020-05-09T15:34:59.999Z, >> > timing=ON_TIME, index =0, isFirst =true, >> > isLast=true pane=PaneInfo{isFirst=true, >> > isLast=true, timing=ON_TIME, index=0, >> > onTimeIndex=0} >> > [After Write] Element=file >> > >> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), >> > timestamp=2020-05-09T15:34:59.999Z, >> > timing=UNKNOWN, index =0, isFirst =true, >> > isLast=true pane=PaneInfo.NO_FIRING >> > >> > >> > El vie., 8 may. 2020 a las 19:01, Reuven Lax >> > (<[email protected] >> > <mailto:[email protected]>>) escribió: >> > >> > The window information should still be >> > there. Beam propagates windows through >> > PCollection, and I don't think >> > WriteFiles does anything explicit to >> > stop that. >> > >> > Can you try this with the direct runner >> > to see what happens there? >> > What is your windowing on this >> PCollection? >> > >> > Reuven >> > >> > On Fri, May 8, 2020 at 3:49 AM Jose >> > Manuel <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > I got the same behavior using Spark >> > Runner (with Spark 2.4.3), window >> > information was missing. >> > >> > Just to clarify, the combiner after >> > TextIO had different results. In >> > Flink runner the files names were >> > dropped, and in Spark the >> > combination process happened twice, >> > duplicating data. I think it is >> > because different runners manage in >> > a different way the data without the >> > windowing information. >> > >> > >> > >> > El vie., 8 may. 2020 a las 0:45, >> > Luke Cwik (<[email protected] >> > <mailto:[email protected]>>) >> escribió: >> > >> > +dev <mailto: >> [email protected]> >> > >> > On Mon, May 4, 2020 at 3:56 AM >> > Jose Manuel >> > <[email protected] >> > <mailto:[email protected]>> >> > wrote: >> > >> > Hi guys, >> > >> > I think I have found >> > something interesting about >> > windowing. >> > >> > I have a pipeline that gets >> > data from Kafka and writes >> > in HDFS by means of TextIO. >> > Once written, generated >> > files are combined to apply >> > some custom operations. >> > However, the combine does >> > not receive data. Following, >> > you can find the >> > highlight of my pipeline. >> > >> > //STREAM + processing time. >> > >> pipeline.apply(KafkaIO.read()) >> > .apply(...) >> > //mappers, a window and a >> > combine >> > >> > .apply("WriteFiles", >> > >> >> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >> > >> > >> .getPerDestinationOutputFilenames() >> > >> > .apply("CombineFileNames", >> > Combine.perKey(...)) >> > >> > Running the pipeline with >> > Flink I have found a log >> > trace that says data are >> > discarded as late in the >> > combine CombineFileNames. >> > Then, I have added >> > AllowedLateness to pipeline >> > window strategy, as a >> > workaround. >> > It works by now, but this >> > opens several questions to >> me >> > >> > I think the problem is >> > >> getPerDestinationOutputFilenames >> > generates files, but >> > it does not maintain the >> > windowing information. Then, >> > CombineFileNames compares >> > file names with the >> > watermark of the pipeline >> > and discards them as late. >> > >> > Is there any issue with >> > >> getPerDestinationOutputFilenames? >> > Maybe, I am doing something >> > wrong >> > and using >> > >> getPerDestinationOutputFilenames >> > + combine does not make >> sense. >> > What do you think? >> > >> > Please, note I am using Beam >> > 2.17.0 with Flink 1.9.1. >> > >> > Many thanks, >> > Jose >> > >> >
