Hi Reuven,

I can try to explaining what I guess.

- There is a source which is reading data entries and updating the
watermark.
- Then, data entries are grouped and stored in files.
- The window information of these data entries are used to emit filenames.
Data entries's window and timestamp. PaneInfo is empty.
- When a second window is applied to filenames, if allowlateness is zero of
lower than the spent time in the previous reading/writing, the filenames
are discarded as late.

I guess, the key is in
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java#L168

My assumption is global watermark (or source watermark, I am not sure about
the name) is used to evaluate the filenames, what are in an already emitted
window.

Thanks
Jose


El lun., 18 may. 2020 a las 18:37, Reuven Lax (<[email protected]>) escribió:

> This is still confusing to me - why would the messages be dropped as late
> in this case?
>
> On Mon, May 18, 2020 at 6:14 AM Maximilian Michels <[email protected]> wrote:
>
>> All runners which use the Beam reference implementation drop the
>> PaneInfo for WriteFilesResult#getPerDestinationOutputFilenames(). That's
>> why we can observe this behavior not only in Flink but also Spark.
>>
>> The WriteFilesResult is returned here:
>>
>> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L363
>>
>> GatherBundlesPerWindow will discard the pane information because all
>> buffered elements are emitted in the FinishBundle method which always
>> has a NO_FIRING (unknown) pane info:
>>
>> https://github.com/apache/beam/blob/d773f8ca7a4d63d01472b5eaef8b67157d60f40e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L895
>>
>> So this seems expected behavior. We would need to preserve the panes in
>> the Multimap buffer.
>>
>> -Max
>>
>> On 15.05.20 18:34, Reuven Lax wrote:
>> > Lateness should never be introduced inside a pipeline - generally late
>> > data can only come from a source.  If data was not dropped as late
>> > earlier in the pipeline, it should not be dropped after the file write.
>> > I suspect that this is a bug in how the Flink runner handles the
>> > Reshuffle transform, but I'm not sure what the exact bug is.
>> >
>> > Reuven
>> >
>> > On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     Hi Jose,
>> >
>> >     thank you for putting the effort to get example which
>> >     demonstrate your problem.
>> >
>> >     You are using a streaming pipeline and it seems that watermark in
>> >     downstream already advanced further, so when your File pane arrives,
>> >     it is already late. Since you define that lateness is not tolerated,
>> >     it is dropped.
>> >     I myself never had requirement to specify zero allowed lateness for
>> >     streaming. It feels dangerous. Do you have a specific use case?
>> >     Also, in may cases, after windowed files are written, I usually
>> >     collect them into global window and specify a different triggering
>> >     policy for collecting them. Both cases are why I never came across
>> >     this situation.
>> >
>> >     I do not have an explanation if it is a bug or not. I would guess
>> >     that watermark can advance further, e.g. because elements can be
>> >     processed in arbitrary order. Not saying this is the case.
>> >     It needs someone with better understanding of how watermark advance
>> >     is / should be handled within pipelines.
>> >
>> >
>> >     P.S.: you can add `.withTimestampFn()` to your generate sequence, to
>> >     get more stable timing, which is also easier to reason about:
>> >
>> >     Dropping element at 1970-01-01T00:00:19.999Z for key
>> >     ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z)
>> >     since too far behind inputWatermark:1970-01-01T00:00:24.000Z;
>> >     outputWatermark:1970-01-01T00:00:24
>> >     .000Z
>> >
>> >                instead of
>> >
>> >     Dropping element at 2020-05-15T08:52:34.999Z for key ...
>> >     window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since
>> >     too far behind inputWatermark:2020-05-15T08:52:39.318Z;
>> >     outputWatermark:2020-05-15T08:52:39.318Z
>> >
>> >
>> >
>> >
>> >     In my
>> >
>> >
>> >
>> >     On Thu, May 14, 2020 at 10:47 AM Jose Manuel <[email protected]
>> >     <mailto:[email protected]>> wrote:
>> >
>> >         Hi again,
>> >
>> >         I have simplify the example to reproduce the data loss. The
>> >         scenario is the following:
>> >
>> >         - TextIO write files.
>> >         - getPerDestinationOutputFilenames emits file names
>> >         - File names are processed by a aggregator (combine, distinct,
>> >         groupbyKey...) with a window **without allowlateness**
>> >         - File names are discarded as late
>> >
>> >         Here you can see the data loss in the picture
>> >         in
>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
>> >
>> >         Please, follow README to run the pipeline and find log traces
>> >         that say data are dropped as late.
>> >         Remember, you can run the pipeline with another
>> >         window's  lateness values (check README.md)
>> >
>> >         Kby.
>> >
>> >         El mar., 12 may. 2020 a las 17:16, Jose Manuel
>> >         (<[email protected] <mailto:[email protected]>>)
>> escribió:
>> >
>> >             Hi,
>> >
>> >             I would like to clarify that while TextIO is writing every
>> >             data are in the files (shards). The losing happens when file
>> >             names emitted by getPerDestinationOutputFilenames are
>> >             processed by a window.
>> >
>> >             I have created a pipeline to reproduce the scenario in which
>> >             some filenames are loss after the
>> >             getPerDestinationOutputFilenames. Please, note I tried to
>> >             simplify the code as much as possible, but the scenario is
>> >             not easy to reproduce.
>> >
>> >             Please check this project
>> >             https://github.com/kiuby88/windowing-textio
>> >             Check readme to build and run
>> >             (https://github.com/kiuby88/windowing-textio#build-and-run)
>> >             Project contains only a class with the
>> >             pipeline PipelineWithTextIo, a log4j2.xml file in the
>> >             resources and the pom.
>> >
>> >             The pipeline in PipelineWithTextIo generates unbounded data
>> >             using a sequence. It adds a little delay (10s) per data
>> >             entry, it uses a distinct (just to apply the window), and
>> >             then it writes data using TexIO.
>> >             The windows for the distinct is fixed (5 seconds) and it
>> >             does not use lateness.
>> >             Generated files can be found in
>> >             windowing-textio/pipe_with_lateness_0s/files. To write files
>> >             the FileNamePolicy uses window + timing + shards
>> >             (see
>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
>> )
>> >             Files are emitted using getPerDestinationOutputFilenames()
>> >             (see the code here,
>> >
>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
>> )
>> >
>> >             Then, File names in the PCollection are extracted and
>> >             logged. Please, note file names dot not have pain
>> >             information in that point.
>> >
>> >             To apply a window a distinct is used again. Here several
>> >             files are discarded as late and they are not processed by
>> >             this second distinct. Please, see
>> >
>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
>> >
>> >             Debug is enabled for WindowTracing, so you can find in the
>> >             terminal several messages as the followiing:
>> >             DEBUG org.apache.beam.sdk.util.WindowTracing -
>> >             LateDataFilter: Dropping element at 2020-05-12T14:05:14.999Z
>> >             for
>> >
>>  
>> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
>> >             window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)
>> >             since too far behind
>> >             inputWatermark:2020-05-12T14:05:19.799Z;
>> >             outputWatermark:2020-05-12T14:05:19.799Z`
>> >
>> >             What happen here? I think that messages are generated per
>> >             second and a window of 5 seconds group them. Then a delay is
>> >             added and finally data are written in a file.
>> >             The pipeline reads more data, increasing the watermark.
>> >             Then, file names are emitted without pane information (see
>> >             "Emitted File" in logs). Window in second distinct compares
>> >             file names' timestamp and the pipeline watermark and then it
>> >             discards file names as late.
>> >
>> >
>> >             Bonus
>> >             -----
>> >             You can add a lateness to the pipeline. See
>> >
>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
>> >
>> >             If a minute is added a lateness for window the file names
>> >             are processed as late. As result the traces of
>> >             LateDataFilter disappear.
>> >
>> >             Moreover, in order to illustrate better that file names are
>> >             emitted as late for the second discarded I added a second
>> >             TextIO to write file names in other files.
>> >             Same FileNamePolicy than before was used (window + timing +
>> >             shards). Then, you can find files that contains the original
>> >             filenames in
>> >
>>  windowing-textio/pipe_with_lateness_60s/files-after-distinct. This
>> >             is the interesting part, because you will find several files
>> >             with LATE in their names.
>> >
>> >             Please, let me know if you need more information or if the
>> >             example is not enough to check the expected scenarios.
>> >
>> >             Kby.
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >             El dom., 10 may. 2020 a las 17:04, Reuven Lax
>> >             (<[email protected] <mailto:[email protected]>>) escribió:
>> >
>> >                 Pane info is supposed to be preserved across transforms.
>> >                 If the Fink runner does not, than I believe that is a
>> bug.
>> >
>> >                 On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek
>> >                 <[email protected] <mailto:[email protected]>>
>> >                 wrote:
>> >
>> >                     I am using FileIO and I do observe the drop of pane
>> >                     info information on Flink runner too. It was
>> >                     mentioned in this thread:
>> >
>> https://www.mail-archive.com/[email protected]/msg20186.html
>> >
>> >                     It is a result of different reshuffle expansion for
>> >                     optimisation reasons. However, I did not observe a
>> >                     data loss in my case. Windowing and watermark info
>> >                     should be preserved. Pane info is not, which brings
>> >                     a question how reliable pane info should be in terms
>> >                     of SDK and runner.
>> >
>> >                     If you do observe a data loss, it would be great to
>> >                     share a test case which replicates the problem.
>> >
>> >                     On Sun, May 10, 2020 at 8:03 AM Reuven Lax
>> >                     <[email protected] <mailto:[email protected]>> wrote:
>> >
>> >                         Ah, I think I see the problem.
>> >
>> >                         It appears that for some reason, the Flink
>> >                         runner loses windowing information when a
>> >                         Reshuffle is applied. I'm not entirely sure why,
>> >                         because windowing information should be
>> >                         maintained across a Reshuffle.
>> >
>> >                         Reuven
>> >
>> >                         On Sat, May 9, 2020 at 9:50 AM Jose Manuel
>> >                         <[email protected]
>> >                         <mailto:[email protected]>> wrote:
>> >
>> >
>> >                             Hi,
>> >
>> >                             I have added some logs to the pipeline as
>> >                             following (you can find the log function in
>> >                             the Appendix):
>> >
>> >                             //STREAM + processing time.
>> >                             pipeline.apply(KafkaIO.read())
>> >                                    .apply(...) //mappers, a window and a
>> >                             combine
>> >                                    .apply(logBeforeWrite())
>> >
>> >                                    .apply("WriteFiles",
>> >
>>  
>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>> >                                    .getPerDestinationOutputFilenames()
>> >
>> >                                    .apply(logAfterWrite())
>> >                                    .apply("CombineFileNames",
>> >                             Combine.perKey(...))
>> >
>> >                             I have run the pipeline using DirectRunner
>> >                             (local), SparkRunner and FlinkRunner, both
>> >                             of them using a cluster.
>> >                             Below you can see the timing and pane
>> >                             information before/after (you can see traces
>> >                             in detail with window and timestamp
>> >                             information in the Appendix).
>> >
>> >                             DirectRunner:
>> >                             [Before Write] timing=ON_TIME,
>> >                             pane=PaneInfo{isFirst=true, isLast=true,
>> >                             timing=ON_TIME, index=0, onTimeIndex=0}
>> >                             [After    Write] timing=EARLY,
>> >                              pane=PaneInfo{isFirst=true, timing=EARLY,
>> >                             index=0}
>> >
>> >                             FlinkRunner:
>> >                             [Before Write] timing=ON_TIME,
>> >                             pane=PaneInfo{isFirst=true, isLast=true,
>> >                             timing=ON_TIME, index=0, onTimeIndex=0}
>> >                             [After    Write] timing=UNKNOWN,
>> >                             pane=PaneInfo.NO_FIRING
>> >
>> >                             SparkRunner:
>> >                             [Before Write] timing=ON_TIME,
>> >                             pane=PaneInfo{isFirst=true, isLast=true,
>> >                             timing=ON_TIME, index=0, onTimeIndex=0}
>> >                             [After    Write] timing=UNKNOWN,
>> >                             pane=PaneInfo.NO_FIRING
>> >
>> >                             It seems DirectRunner propagates the
>> >                             windowing information as expected.
>> >                             I am not sure if TextIO really propagates or
>> >                             it just emits a window pane, because the
>> >                             timing before TextIO is ON_TIME and after
>> >                             TextIO is EARLY.
>> >                             In any case using FlinkRunner and
>> >                             SparkRunner the timing and the pane are not
>> set.
>> >
>> >                             I thought the problem was in
>> >                             GatherBundlesPerWindowFn, but now, after
>> >                             seeing that the DirectRunner filled
>> >                             windowing data... I am not sure.
>> >
>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>> >
>> >
>> >                             Appendix
>> >                             -----------
>> >                             Here you can see the log function and traces
>> >                             for different runners in detail.
>> >
>> >                             private SingleOutput<String, String>
>> >                             logBefore() {
>> >                                 return ParDo.of(new DoFn<String,
>> String>() {
>> >                                     @ProcessElement
>> >                                     public void
>> >                             processElement(ProcessContext context,
>> >                             BoundedWindow boundedWindow) {
>> >                                         String value =
>> context.element();
>> >                                         log.info
>> >                             <http://log.info>("[Before Write]
>> >                             Element=data window={}, timestamp={},
>> >                             timing={}, index ={}, isFirst ={},
>> >                             isLast={}, pane={}",
>> >                                                 boundedWindow,
>> >                                                 context.timestamp(),
>> >
>> context.pane().getTiming(),
>> >
>> context.pane().getIndex(),
>> >
>> context.pane().isFirst(),
>> >                                                 context.pane().isLast(),
>> >                                                 context.pane()
>> >                                         );
>> >
>> context.output(context.element());
>> >                                     }
>> >                                 });
>> >                             }
>> >
>> >                             logAfter function shows the same
>> information.
>> >
>> >                             Traces in details.
>> >
>> >                             DirectRunner (local):
>> >                             [Before Write] Element=data
>> >
>>  window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>> >                             timestamp=2020-05-09T13:39:59.999Z,
>> >                             timing=ON_TIME, index =0, isFirst =true,
>> >                             isLast=true  pane=PaneInfo{isFirst=true,
>> >                             isLast=true, timing=ON_TIME, index=0,
>> >                             onTimeIndex=0}
>> >                             [After  Write] Element=file
>> >
>>  window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>> >                             timestamp=2020-05-09T13:39:59.999Z,
>> >                             timing=EARLY,   index =0, isFirst =true,
>> >                             isLast=false pane=PaneInfo{isFirst=true,
>> >                             timing=EARLY, index=0}
>> >
>> >
>> >                             FlinkRunner (cluster):
>> >                             [Before Write] Element=data
>> >
>>  window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>> >                             timestamp=2020-05-09T15:13:59.999Z,
>> >                             timing=ON_TIME, index =0, isFirst =true,
>> >                             isLast=true  pane=PaneInfo{isFirst=true,
>> >                             isLast=true, timing=ON_TIME, index=0,
>> >                             onTimeIndex=0}
>> >                             [After  Write] Element=file
>> >
>>  window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>> >                             timestamp=2020-05-09T15:13:59.999Z,
>> >                             timing=UNKNOWN, index =0, isFirst =true,
>> >                             isLast=true  pane=PaneInfo.NO_FIRING
>> >
>> >                             SparkRunner (cluster):
>> >                             [Before Write] Element=data
>> >
>>  window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>> >                             timestamp=2020-05-09T15:34:59.999Z,
>> >                             timing=ON_TIME, index =0, isFirst =true,
>> >                             isLast=true  pane=PaneInfo{isFirst=true,
>> >                             isLast=true, timing=ON_TIME, index=0,
>> >                             onTimeIndex=0}
>> >                             [After  Write] Element=file
>> >
>>  window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>> >                             timestamp=2020-05-09T15:34:59.999Z,
>> >                             timing=UNKNOWN, index =0, isFirst =true,
>> >                             isLast=true  pane=PaneInfo.NO_FIRING
>> >
>> >
>> >                             El vie., 8 may. 2020 a las 19:01, Reuven Lax
>> >                             (<[email protected]
>> >                             <mailto:[email protected]>>) escribió:
>> >
>> >                                 The window information should still be
>> >                                 there.  Beam propagates windows through
>> >                                 PCollection, and I don't think
>> >                                 WriteFiles does anything explicit to
>> >                                 stop that.
>> >
>> >                                 Can you try this with the direct runner
>> >                                 to see what happens there?
>> >                                 What is your windowing on this
>> PCollection?
>> >
>> >                                 Reuven
>> >
>> >                                 On Fri, May 8, 2020 at 3:49 AM Jose
>> >                                 Manuel <[email protected]
>> >                                 <mailto:[email protected]>> wrote:
>> >
>> >                                     I got the same behavior using Spark
>> >                                     Runner (with Spark 2.4.3), window
>> >                                     information was missing.
>> >
>> >                                     Just to clarify, the combiner after
>> >                                     TextIO had different results. In
>> >                                     Flink runner the files names were
>> >                                     dropped, and in Spark the
>> >                                     combination process happened twice,
>> >                                     duplicating data.  I think it is
>> >                                     because different runners manage in
>> >                                     a different way the data without the
>> >                                     windowing information.
>> >
>> >
>> >
>> >                                     El vie., 8 may. 2020 a las 0:45,
>> >                                     Luke Cwik (<[email protected]
>> >                                     <mailto:[email protected]>>)
>> escribió:
>> >
>> >                                         +dev <mailto:
>> [email protected]>
>> >
>> >                                         On Mon, May 4, 2020 at 3:56 AM
>> >                                         Jose Manuel
>> >                                         <[email protected]
>> >                                         <mailto:[email protected]>>
>> >                                         wrote:
>> >
>> >                                             Hi guys,
>> >
>> >                                             I think I have found
>> >                                             something interesting about
>> >                                             windowing.
>> >
>> >                                             I have a pipeline that gets
>> >                                             data from Kafka and writes
>> >                                             in HDFS by means of TextIO.
>> >                                             Once written, generated
>> >                                             files are combined to apply
>> >                                             some custom operations.
>> >                                             However, the combine does
>> >                                             not receive data. Following,
>> >                                             you can find the
>> >                                             highlight of my pipeline.
>> >
>> >                                             //STREAM + processing time.
>> >
>>  pipeline.apply(KafkaIO.read())
>> >                                                     .apply(...)
>> >                                             //mappers, a window and a
>> >                                             combine
>> >
>> >                                                     .apply("WriteFiles",
>> >
>>  
>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>> >
>> >
>>  .getPerDestinationOutputFilenames()
>> >
>> >                                             .apply("CombineFileNames",
>> >                                             Combine.perKey(...))
>> >
>> >                                             Running the pipeline with
>> >                                             Flink I have found a log
>> >                                             trace that says data are
>> >                                             discarded as late in the
>> >                                             combine CombineFileNames.
>> >                                             Then, I have added
>> >                                             AllowedLateness to pipeline
>> >                                             window strategy, as a
>> >                                             workaround.
>> >                                             It works by now, but this
>> >                                             opens several questions to
>> me
>> >
>> >                                             I think the problem is
>> >
>>  getPerDestinationOutputFilenames
>> >                                             generates files, but
>> >                                             it does not maintain the
>> >                                             windowing information. Then,
>> >                                             CombineFileNames compares
>> >                                             file names with the
>> >                                             watermark of the pipeline
>> >                                             and discards them as late.
>> >
>> >                                             Is there any issue with
>> >
>>  getPerDestinationOutputFilenames?
>> >                                             Maybe, I am doing something
>> >                                             wrong
>> >                                             and using
>> >
>>  getPerDestinationOutputFilenames
>> >                                             + combine does not make
>> sense.
>> >                                             What do you think?
>> >
>> >                                             Please, note I am using Beam
>> >                                             2.17.0 with Flink 1.9.1.
>> >
>> >                                             Many thanks,
>> >                                             Jose
>> >
>>
>

Reply via email to