Hi,

Many thanks for your responses.

I agree with you, Reuven, source is who should determine if data are late
or not.

Maximilian, I agree with you, as I mentioned in previous emails I saw the
same behavior with Spark, and I guessed the problem was here
https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406

I tried the same solution you said on my local, maintaining the pane info
in to emit them as part of the finish bundle, but I had some problems with
gradle.
Happy to know where is the problem.

Thanks,
Jose






El vie., 15 may. 2020 a las 18:34, Reuven Lax (<[email protected]>) escribió:

> Lateness should never be introduced inside a pipeline - generally late
> data can only come from a source.  If data was not dropped as late earlier
> in the pipeline, it should not be dropped after the file write. I suspect
> that this is a bug in how the Flink runner handles the Reshuffle transform,
> but I'm not sure what the exact bug is.
>
> Reuven
>
> On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek <[email protected]>
> wrote:
>
>> Hi Jose,
>>
>> thank you for putting the effort to get example which demonstrate your
>> problem.
>>
>> You are using a streaming pipeline and it seems that watermark in
>> downstream already advanced further, so when your File pane arrives, it is
>> already late. Since you define that lateness is not tolerated, it is
>> dropped.
>> I myself never had requirement to specify zero allowed lateness for
>> streaming. It feels dangerous. Do you have a specific use case? Also, in
>> may cases, after windowed files are written, I usually collect them into
>> global window and specify a different triggering policy for collecting
>> them. Both cases are why I never came across this situation.
>>
>> I do not have an explanation if it is a bug or not. I would guess that
>> watermark can advance further, e.g. because elements can be processed in
>> arbitrary order. Not saying this is the case.
>> It needs someone with better understanding of how watermark advance is /
>> should be handled within pipelines.
>>
>>
>> P.S.: you can add `.withTimestampFn()` to your generate sequence, to get
>> more stable timing, which is also easier to reason about:
>>
>> Dropping element at 1970-01-01T00:00:19.999Z for key
>> ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) since too
>> far behind inputWatermark:1970-01-01T00:00:24.000Z;
>> outputWatermark:1970-01-01T00:00:24
>> .000Z
>>
>>            instead of
>>
>> Dropping element at 2020-05-15T08:52:34.999Z for key ...
>> window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since too far
>> behind inputWatermark:2020-05-15T08:52:39.318Z;
>> outputWatermark:2020-05-15T08:52:39.318Z
>>
>>
>>
>>
>> In my
>>
>>
>>
>> On Thu, May 14, 2020 at 10:47 AM Jose Manuel <[email protected]>
>> wrote:
>>
>>> Hi again,
>>>
>>> I have simplify the example to reproduce the data loss. The scenario is
>>> the following:
>>>
>>> - TextIO write files.
>>> - getPerDestinationOutputFilenames emits file names
>>> - File names are processed by a aggregator (combine, distinct,
>>> groupbyKey...) with a window **without allowlateness**
>>> - File names are discarded as late
>>>
>>> Here you can see the data loss in the picture in
>>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
>>>
>>> Please, follow README to run the pipeline and find log traces that say
>>> data are dropped as late.
>>> Remember, you can run the pipeline with another window's  lateness
>>> values (check README.md)
>>>
>>> Kby.
>>>
>>> El mar., 12 may. 2020 a las 17:16, Jose Manuel (<[email protected]>)
>>> escribió:
>>>
>>>> Hi,
>>>>
>>>> I would like to clarify that while TextIO is writing every data are in
>>>> the files (shards). The losing happens when file names emitted by
>>>> getPerDestinationOutputFilenames are processed by a window.
>>>>
>>>> I have created a pipeline to reproduce the scenario in which some
>>>> filenames are loss after the getPerDestinationOutputFilenames. Please, note
>>>> I tried to simplify the code as much as possible, but the scenario is not
>>>> easy to reproduce.
>>>>
>>>> Please check this project https://github.com/kiuby88/windowing-textio
>>>> Check readme to build and run (
>>>> https://github.com/kiuby88/windowing-textio#build-and-run)
>>>> Project contains only a class with the pipeline PipelineWithTextIo,
>>>> a log4j2.xml file in the resources and the pom.
>>>>
>>>> The pipeline in PipelineWithTextIo generates unbounded data using a
>>>> sequence. It adds a little delay (10s) per data entry, it uses a distinct
>>>> (just to apply the window), and then it writes data using TexIO.
>>>> The windows for the distinct is fixed (5 seconds) and it does not use
>>>> lateness.
>>>> Generated files can be found in
>>>> windowing-textio/pipe_with_lateness_0s/files. To write files the
>>>> FileNamePolicy uses window + timing + shards (see
>>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
>>>> )
>>>> Files are emitted using getPerDestinationOutputFilenames()
>>>> (see the code here,
>>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
>>>> )
>>>>
>>>> Then, File names in the PCollection are extracted and logged. Please,
>>>> note file names dot not have pain information in that point.
>>>>
>>>> To apply a window a distinct is used again. Here several files are
>>>> discarded as late and they are not processed by this second distinct.
>>>> Please, see
>>>>
>>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
>>>>
>>>> Debug is enabled for WindowTracing, so you can find in the terminal
>>>> several messages as the followiing:
>>>> DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping
>>>> element at 2020-05-12T14:05:14.999Z for
>>>> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
>>>> window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far
>>>> behind inputWatermark:2020-05-12T14:05:19.799Z;
>>>> outputWatermark:2020-05-12T14:05:19.799Z`
>>>>
>>>> What happen here? I think that messages are generated per second and a
>>>> window of 5 seconds group them. Then a delay is added and finally data are
>>>> written in a file.
>>>> The pipeline reads more data, increasing the watermark.
>>>> Then, file names are emitted without pane information (see "Emitted
>>>> File" in logs). Window in second distinct compares file names' timestamp
>>>> and the pipeline watermark and then it discards file names as late.
>>>>
>>>>
>>>> Bonus
>>>> -----
>>>> You can add a lateness to the pipeline. See
>>>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#run-with-lateness
>>>>
>>>> If a minute is added a lateness for window the file names are processed
>>>> as late. As result the traces of LateDataFilter disappear.
>>>>
>>>> Moreover, in order to illustrate better that file names are emitted as
>>>> late for the second discarded I added a second TextIO to write file names
>>>> in other files.
>>>> Same FileNamePolicy than before was used (window + timing + shards).
>>>> Then, you can find files that contains the original filenames in
>>>> windowing-textio/pipe_with_lateness_60s/files-after-distinct. This is the
>>>> interesting part, because you will find several files with LATE in their
>>>> names.
>>>>
>>>> Please, let me know if you need more information or if the example is
>>>> not enough to check the expected scenarios.
>>>>
>>>> Kby.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> El dom., 10 may. 2020 a las 17:04, Reuven Lax (<[email protected]>)
>>>> escribió:
>>>>
>>>>> Pane info is supposed to be preserved across transforms. If the Fink
>>>>> runner does not, than I believe that is a bug.
>>>>>
>>>>> On Sat, May 9, 2020 at 11:22 PM Jozef Vilcek <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> I am using FileIO and I do observe the drop of pane info information
>>>>>> on Flink runner too. It was mentioned in this thread:
>>>>>> https://www.mail-archive.com/[email protected]/msg20186.html
>>>>>>
>>>>>> It is a result of different reshuffle expansion for optimisation
>>>>>> reasons. However, I did not observe a data loss in my case. Windowing and
>>>>>> watermark info should be preserved. Pane info is not, which brings a
>>>>>> question how reliable pane info should be in terms of SDK and runner.
>>>>>>
>>>>>> If you do observe a data loss, it would be great to share a test case
>>>>>> which replicates the problem.
>>>>>>
>>>>>> On Sun, May 10, 2020 at 8:03 AM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> Ah, I think I see the problem.
>>>>>>>
>>>>>>> It appears that for some reason, the Flink runner loses windowing
>>>>>>> information when a Reshuffle is applied. I'm not entirely sure why, 
>>>>>>> because
>>>>>>> windowing information should be maintained across a Reshuffle.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I have added some logs to the pipeline as following (you can find
>>>>>>>> the log function in the Appendix):
>>>>>>>>
>>>>>>>> //STREAM + processing time.
>>>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>>>        .apply(...) //mappers, a window and a combine
>>>>>>>>        .apply(logBeforeWrite())
>>>>>>>>
>>>>>>>>        .apply("WriteFiles",
>>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>>>        .getPerDestinationOutputFilenames()
>>>>>>>>
>>>>>>>>        .apply(logAfterWrite())
>>>>>>>>        .apply("CombineFileNames", Combine.perKey(...))
>>>>>>>>
>>>>>>>> I have run the pipeline using DirectRunner (local), SparkRunner and
>>>>>>>> FlinkRunner, both of them using a cluster.
>>>>>>>> Below you can see the timing and pane information before/after (you
>>>>>>>> can see traces in detail with window and timestamp information in the
>>>>>>>> Appendix).
>>>>>>>>
>>>>>>>> DirectRunner:
>>>>>>>> [Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true,
>>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>>> [After    Write] timing=EARLY,       pane=PaneInfo{isFirst=true,
>>>>>>>> timing=EARLY, index=0}
>>>>>>>>
>>>>>>>> FlinkRunner:
>>>>>>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>>>>>>
>>>>>>>> SparkRunner:
>>>>>>>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>>>>>>>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>>>>>>>
>>>>>>>> It seems DirectRunner propagates the windowing information as
>>>>>>>> expected.
>>>>>>>> I am not sure if TextIO really propagates or it just emits a window
>>>>>>>> pane, because the timing before TextIO is ON_TIME and after TextIO is 
>>>>>>>> EARLY.
>>>>>>>> In any case using FlinkRunner and SparkRunner the timing and the
>>>>>>>> pane are not set.
>>>>>>>>
>>>>>>>> I thought the problem was in GatherBundlesPerWindowFn, but now,
>>>>>>>> after seeing that the DirectRunner filled windowing data... I am not 
>>>>>>>> sure.
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>>>>>>>>
>>>>>>>>
>>>>>>>> Appendix
>>>>>>>> -----------
>>>>>>>> Here you can see the log function and traces for different runners
>>>>>>>> in detail.
>>>>>>>>
>>>>>>>> private SingleOutput<String, String> logBefore() {
>>>>>>>>     return ParDo.of(new DoFn<String, String>() {
>>>>>>>>         @ProcessElement
>>>>>>>>         public void processElement(ProcessContext context,
>>>>>>>> BoundedWindow boundedWindow) {
>>>>>>>>             String value = context.element();
>>>>>>>>             log.info("[Before Write] Element=data window={},
>>>>>>>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>>>>>>>>                     boundedWindow,
>>>>>>>>                     context.timestamp(),
>>>>>>>>                     context.pane().getTiming(),
>>>>>>>>                     context.pane().getIndex(),
>>>>>>>>                     context.pane().isFirst(),
>>>>>>>>                     context.pane().isLast(),
>>>>>>>>                     context.pane()
>>>>>>>>             );
>>>>>>>>             context.output(context.element());
>>>>>>>>         }
>>>>>>>>     });
>>>>>>>> }
>>>>>>>>
>>>>>>>> logAfter function shows the same information.
>>>>>>>>
>>>>>>>> Traces in details.
>>>>>>>>
>>>>>>>> DirectRunner (local):
>>>>>>>> [Before Write] Element=data
>>>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
>>>>>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>>> [After  Write] Element=file
>>>>>>>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>>>>>>>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
>>>>>>>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>>>>>>>>
>>>>>>>>
>>>>>>>> FlinkRunner (cluster):
>>>>>>>> [Before Write] Element=data
>>>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
>>>>>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>>> [After  Write] Element=file
>>>>>>>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>>>>>>>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst
>>>>>>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>>>>>>
>>>>>>>> SparkRunner (cluster):
>>>>>>>> [Before Write] Element=data
>>>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst
>>>>>>>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>>>>>>>> timing=ON_TIME, index=0, onTimeIndex=0}
>>>>>>>> [After  Write] Element=file
>>>>>>>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>>>>>>>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst
>>>>>>>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>>>>>>>
>>>>>>>>
>>>>>>>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<[email protected]>)
>>>>>>>> escribió:
>>>>>>>>
>>>>>>>>> The window information should still be there.  Beam propagates
>>>>>>>>> windows through PCollection, and I don't think WriteFiles does 
>>>>>>>>> anything
>>>>>>>>> explicit to stop that.
>>>>>>>>>
>>>>>>>>> Can you try this with the direct runner to see what happens there?
>>>>>>>>> What is your windowing on this PCollection?
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I got the same behavior using Spark Runner (with Spark 2.4.3),
>>>>>>>>>> window information was missing.
>>>>>>>>>>
>>>>>>>>>> Just to clarify, the combiner after TextIO had different results.
>>>>>>>>>> In Flink runner the files names were dropped, and in Spark the 
>>>>>>>>>> combination
>>>>>>>>>> process happened twice, duplicating data.  I think it is because 
>>>>>>>>>> different
>>>>>>>>>> runners manage in a different way the data without the windowing
>>>>>>>>>> information.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<[email protected]>)
>>>>>>>>>> escribió:
>>>>>>>>>>
>>>>>>>>>>> +dev <[email protected]>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi guys,
>>>>>>>>>>>>
>>>>>>>>>>>> I think I have found something interesting about windowing.
>>>>>>>>>>>>
>>>>>>>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS
>>>>>>>>>>>> by means of TextIO.
>>>>>>>>>>>> Once written, generated files are combined to apply some custom
>>>>>>>>>>>> operations.
>>>>>>>>>>>> However, the combine does not receive data. Following, you can
>>>>>>>>>>>> find the
>>>>>>>>>>>> highlight of my pipeline.
>>>>>>>>>>>>
>>>>>>>>>>>> //STREAM + processing time.
>>>>>>>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>>>>>>>         .apply(...) //mappers, a window and a combine
>>>>>>>>>>>>
>>>>>>>>>>>>         .apply("WriteFiles",
>>>>>>>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>>>>>>>         .getPerDestinationOutputFilenames()
>>>>>>>>>>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>>>>>>>>>>
>>>>>>>>>>>> Running the pipeline with Flink I have found a log trace that
>>>>>>>>>>>> says data are
>>>>>>>>>>>> discarded as late in the combine CombineFileNames. Then, I
>>>>>>>>>>>> have added
>>>>>>>>>>>> AllowedLateness to pipeline window strategy, as a workaround.
>>>>>>>>>>>> It works by now, but this opens several questions to me
>>>>>>>>>>>>
>>>>>>>>>>>> I think the problem is getPerDestinationOutputFilenames
>>>>>>>>>>>> generates files, but
>>>>>>>>>>>> it does not maintain the windowing information. Then,
>>>>>>>>>>>> CombineFileNames compares
>>>>>>>>>>>> file names with the watermark of the pipeline and discards them
>>>>>>>>>>>> as late.
>>>>>>>>>>>>
>>>>>>>>>>>> Is there any issue with getPerDestinationOutputFilenames?
>>>>>>>>>>>> Maybe, I am doing something wrong
>>>>>>>>>>>> and using getPerDestinationOutputFilenames + combine does not
>>>>>>>>>>>> make sense.
>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>
>>>>>>>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>>>>>>>>>>
>>>>>>>>>>>> Many thanks,
>>>>>>>>>>>> Jose
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to