I am using FileIO and I do observe the drop of pane info information on
Flink runner too. It was mentioned in this thread:
https://www.mail-archive.com/dev@beam.apache.org/msg20186.html

It is a result of different reshuffle expansion for optimisation reasons.
However, I did not observe a data loss in my case. Windowing and watermark
info should be preserved. Pane info is not, which brings a question how
reliable pane info should be in terms of SDK and runner.

If you do observe a data loss, it would be great to share a test case
which replicates the problem.

On Sun, May 10, 2020 at 8:03 AM Reuven Lax <re...@google.com> wrote:

> Ah, I think I see the problem.
>
> It appears that for some reason, the Flink runner loses windowing
> information when a Reshuffle is applied. I'm not entirely sure why, because
> windowing information should be maintained across a Reshuffle.
>
> Reuven
>
> On Sat, May 9, 2020 at 9:50 AM Jose Manuel <kiuby88....@gmail.com> wrote:
>
>>
>> Hi,
>>
>> I have added some logs to the pipeline as following (you can find the log
>> function in the Appendix):
>>
>> //STREAM + processing time.
>> pipeline.apply(KafkaIO.read())
>>        .apply(...) //mappers, a window and a combine
>>        .apply(logBeforeWrite())
>>
>>        .apply("WriteFiles",
>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>        .getPerDestinationOutputFilenames()
>>
>>        .apply(logAfterWrite())
>>        .apply("CombineFileNames", Combine.perKey(...))
>>
>> I have run the pipeline using DirectRunner (local), SparkRunner and
>> FlinkRunner, both of them using a cluster.
>> Below you can see the timing and pane information before/after (you can
>> see traces in detail with window and timestamp information in the Appendix).
>>
>> DirectRunner:
>> [Before Write] timing=ON_TIME,  pane=PaneInfo{isFirst=true, isLast=true,
>> timing=ON_TIME, index=0, onTimeIndex=0}
>> [After    Write] timing=EARLY,       pane=PaneInfo{isFirst=true,
>> timing=EARLY, index=0}
>>
>> FlinkRunner:
>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>
>> SparkRunner:
>> [Before Write] timing=ON_TIME,    pane=PaneInfo{isFirst=true,
>> isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}
>> [After    Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING
>>
>> It seems DirectRunner propagates the windowing information as expected.
>> I am not sure if TextIO really propagates or it just emits a window pane,
>> because the timing before TextIO is ON_TIME and after TextIO is EARLY.
>> In any case using FlinkRunner and SparkRunner the timing and the pane are
>> not set.
>>
>> I thought the problem was in GatherBundlesPerWindowFn, but now, after
>> seeing that the DirectRunner filled windowing data... I am not sure.
>>
>> https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406
>>
>>
>> Appendix
>> -----------
>> Here you can see the log function and traces for different runners in
>> detail.
>>
>> private SingleOutput<String, String> logBefore() {
>>     return ParDo.of(new DoFn<String, String>() {
>>         @ProcessElement
>>         public void processElement(ProcessContext context, BoundedWindow
>> boundedWindow) {
>>             String value = context.element();
>>             log.info("[Before Write] Element=data window={},
>> timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}",
>>                     boundedWindow,
>>                     context.timestamp(),
>>                     context.pane().getTiming(),
>>                     context.pane().getIndex(),
>>                     context.pane().isFirst(),
>>                     context.pane().isLast(),
>>                     context.pane()
>>             );
>>             context.output(context.element());
>>         }
>>     });
>> }
>>
>> logAfter function shows the same information.
>>
>> Traces in details.
>>
>> DirectRunner (local):
>> [Before Write] Element=data
>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>> timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst
>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>> timing=ON_TIME, index=0, onTimeIndex=0}
>> [After  Write] Element=file
>> window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z),
>> timestamp=2020-05-09T13:39:59.999Z, timing=EARLY,   index =0, isFirst
>> =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0}
>>
>>
>> FlinkRunner (cluster):
>> [Before Write] Element=data
>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>> timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst
>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>> timing=ON_TIME, index=0, onTimeIndex=0}
>> [After  Write] Element=file
>> window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z),
>> timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst
>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>
>> SparkRunner (cluster):
>> [Before Write] Element=data
>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>> timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst
>> =true, isLast=true  pane=PaneInfo{isFirst=true, isLast=true,
>> timing=ON_TIME, index=0, onTimeIndex=0}
>> [After  Write] Element=file
>> window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z),
>> timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst
>> =true, isLast=true  pane=PaneInfo.NO_FIRING
>>
>>
>> El vie., 8 may. 2020 a las 19:01, Reuven Lax (<re...@google.com>)
>> escribió:
>>
>>> The window information should still be there.  Beam propagates windows
>>> through PCollection, and I don't think WriteFiles does anything explicit to
>>> stop that.
>>>
>>> Can you try this with the direct runner to see what happens there?
>>> What is your windowing on this PCollection?
>>>
>>> Reuven
>>>
>>> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <kiuby88....@gmail.com>
>>> wrote:
>>>
>>>> I got the same behavior using Spark Runner (with Spark 2.4.3), window
>>>> information was missing.
>>>>
>>>> Just to clarify, the combiner after TextIO had different results. In
>>>> Flink runner the files names were dropped, and in Spark the combination
>>>> process happened twice, duplicating data.  I think it is because different
>>>> runners manage in a different way the data without the windowing
>>>> information.
>>>>
>>>>
>>>>
>>>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<lc...@google.com>)
>>>> escribió:
>>>>
>>>>> +dev <dev@beam.apache.org>
>>>>>
>>>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <kiuby88....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>>
>>>>>> I think I have found something interesting about windowing.
>>>>>>
>>>>>> I have a pipeline that gets data from Kafka and writes in HDFS by
>>>>>> means of TextIO.
>>>>>> Once written, generated files are combined to apply some custom
>>>>>> operations.
>>>>>> However, the combine does not receive data. Following, you can find
>>>>>> the
>>>>>> highlight of my pipeline.
>>>>>>
>>>>>> //STREAM + processing time.
>>>>>> pipeline.apply(KafkaIO.read())
>>>>>>         .apply(...) //mappers, a window and a combine
>>>>>>
>>>>>>         .apply("WriteFiles",
>>>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>>>>>         .getPerDestinationOutputFilenames()
>>>>>>         .apply("CombineFileNames", Combine.perKey(...))
>>>>>>
>>>>>> Running the pipeline with Flink I have found a log trace that says
>>>>>> data are
>>>>>> discarded as late in the combine CombineFileNames. Then, I have added
>>>>>> AllowedLateness to pipeline window strategy, as a workaround.
>>>>>> It works by now, but this opens several questions to me
>>>>>>
>>>>>> I think the problem is getPerDestinationOutputFilenames generates
>>>>>> files, but
>>>>>> it does not maintain the windowing information. Then,
>>>>>> CombineFileNames compares
>>>>>> file names with the watermark of the pipeline and discards them as
>>>>>> late.
>>>>>>
>>>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I
>>>>>> am doing something wrong
>>>>>> and using getPerDestinationOutputFilenames + combine does not make
>>>>>> sense.
>>>>>> What do you think?
>>>>>>
>>>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>>>>>
>>>>>> Many thanks,
>>>>>> Jose
>>>>>>
>>>>>>

Reply via email to