Ah, I think I see the problem. It appears that for some reason, the Flink runner loses windowing information when a Reshuffle is applied. I'm not entirely sure why, because windowing information should be maintained across a Reshuffle.
Reuven On Sat, May 9, 2020 at 9:50 AM Jose Manuel <[email protected]> wrote: > > Hi, > > I have added some logs to the pipeline as following (you can find the log > function in the Appendix): > > //STREAM + processing time. > pipeline.apply(KafkaIO.read()) > .apply(...) //mappers, a window and a combine > .apply(logBeforeWrite()) > > .apply("WriteFiles", > TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) > .getPerDestinationOutputFilenames() > > .apply(logAfterWrite()) > .apply("CombineFileNames", Combine.perKey(...)) > > I have run the pipeline using DirectRunner (local), SparkRunner and > FlinkRunner, both of them using a cluster. > Below you can see the timing and pane information before/after (you can > see traces in detail with window and timestamp information in the Appendix). > > DirectRunner: > [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] timing=EARLY, pane=PaneInfo{isFirst=true, > timing=EARLY, index=0} > > FlinkRunner: > [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING > > SparkRunner: > [Before Write] timing=ON_TIME, pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] timing=UNKNOWN, pane=PaneInfo.NO_FIRING > > It seems DirectRunner propagates the windowing information as expected. > I am not sure if TextIO really propagates or it just emits a window pane, > because the timing before TextIO is ON_TIME and after TextIO is EARLY. > In any case using FlinkRunner and SparkRunner the timing and the pane are > not set. > > I thought the problem was in GatherBundlesPerWindowFn, but now, after > seeing that the DirectRunner filled windowing data... I am not sure. > > https://github.com/apache/beam/blob/6a4ef33607572569ea08b9e10654d1755cfba846/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L406 > > > Appendix > ----------- > Here you can see the log function and traces for different runners in > detail. > > private SingleOutput<String, String> logBefore() { > return ParDo.of(new DoFn<String, String>() { > @ProcessElement > public void processElement(ProcessContext context, BoundedWindow > boundedWindow) { > String value = context.element(); > log.info("[Before Write] Element=data window={}, > timestamp={}, timing={}, index ={}, isFirst ={}, isLast={}, pane={}", > boundedWindow, > context.timestamp(), > context.pane().getTiming(), > context.pane().getIndex(), > context.pane().isFirst(), > context.pane().isLast(), > context.pane() > ); > context.output(context.element()); > } > }); > } > > logAfter function shows the same information. > > Traces in details. > > DirectRunner (local): > [Before Write] Element=data > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > timestamp=2020-05-09T13:39:59.999Z, timing=ON_TIME, index =0, isFirst > =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] Element=file > window=[2020-05-09T13:39:00.000Z..2020-05-09T13:40:00.000Z), > timestamp=2020-05-09T13:39:59.999Z, timing=EARLY, index =0, isFirst > =true, isLast=false pane=PaneInfo{isFirst=true, timing=EARLY, index=0} > > > FlinkRunner (cluster): > [Before Write] Element=data > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > timestamp=2020-05-09T15:13:59.999Z, timing=ON_TIME, index =0, isFirst > =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] Element=file > window=[2020-05-09T15:13:00.000Z..2020-05-09T15:14:00.000Z), > timestamp=2020-05-09T15:13:59.999Z, timing=UNKNOWN, index =0, isFirst > =true, isLast=true pane=PaneInfo.NO_FIRING > > SparkRunner (cluster): > [Before Write] Element=data > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > timestamp=2020-05-09T15:34:59.999Z, timing=ON_TIME, index =0, isFirst > =true, isLast=true pane=PaneInfo{isFirst=true, isLast=true, > timing=ON_TIME, index=0, onTimeIndex=0} > [After Write] Element=file > window=[2020-05-09T15:34:00.000Z..2020-05-09T15:35:00.000Z), > timestamp=2020-05-09T15:34:59.999Z, timing=UNKNOWN, index =0, isFirst > =true, isLast=true pane=PaneInfo.NO_FIRING > > > El vie., 8 may. 2020 a las 19:01, Reuven Lax (<[email protected]>) > escribió: > >> The window information should still be there. Beam propagates windows >> through PCollection, and I don't think WriteFiles does anything explicit to >> stop that. >> >> Can you try this with the direct runner to see what happens there? >> What is your windowing on this PCollection? >> >> Reuven >> >> On Fri, May 8, 2020 at 3:49 AM Jose Manuel <[email protected]> wrote: >> >>> I got the same behavior using Spark Runner (with Spark 2.4.3), window >>> information was missing. >>> >>> Just to clarify, the combiner after TextIO had different results. In >>> Flink runner the files names were dropped, and in Spark the combination >>> process happened twice, duplicating data. I think it is because different >>> runners manage in a different way the data without the windowing >>> information. >>> >>> >>> >>> El vie., 8 may. 2020 a las 0:45, Luke Cwik (<[email protected]>) >>> escribió: >>> >>>> +dev <[email protected]> >>>> >>>> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <[email protected]> >>>> wrote: >>>> >>>>> Hi guys, >>>>> >>>>> I think I have found something interesting about windowing. >>>>> >>>>> I have a pipeline that gets data from Kafka and writes in HDFS by >>>>> means of TextIO. >>>>> Once written, generated files are combined to apply some custom >>>>> operations. >>>>> However, the combine does not receive data. Following, you can find the >>>>> highlight of my pipeline. >>>>> >>>>> //STREAM + processing time. >>>>> pipeline.apply(KafkaIO.read()) >>>>> .apply(...) //mappers, a window and a combine >>>>> >>>>> .apply("WriteFiles", >>>>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >>>>> .getPerDestinationOutputFilenames() >>>>> .apply("CombineFileNames", Combine.perKey(...)) >>>>> >>>>> Running the pipeline with Flink I have found a log trace that says >>>>> data are >>>>> discarded as late in the combine CombineFileNames. Then, I have added >>>>> AllowedLateness to pipeline window strategy, as a workaround. >>>>> It works by now, but this opens several questions to me >>>>> >>>>> I think the problem is getPerDestinationOutputFilenames generates >>>>> files, but >>>>> it does not maintain the windowing information. Then, CombineFileNames >>>>> compares >>>>> file names with the watermark of the pipeline and discards them as >>>>> late. >>>>> >>>>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am >>>>> doing something wrong >>>>> and using getPerDestinationOutputFilenames + combine does not make >>>>> sense. >>>>> What do you think? >>>>> >>>>> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >>>>> >>>>> Many thanks, >>>>> Jose >>>>> >>>>>
