This is strange - AFAICT the windowing information should not be dropped. Is it possible that the Flink runner isn't handling the WriteFilesResult object (POutput value that embeds a PCollection) properly? Any way to test this with another runner?
Reuven On Thu, May 7, 2020 at 3:45 PM Luke Cwik <[email protected]> wrote: > +dev <[email protected]> > > On Mon, May 4, 2020 at 3:56 AM Jose Manuel <[email protected]> wrote: > >> Hi guys, >> >> I think I have found something interesting about windowing. >> >> I have a pipeline that gets data from Kafka and writes in HDFS by means >> of TextIO. >> Once written, generated files are combined to apply some custom >> operations. >> However, the combine does not receive data. Following, you can find the >> highlight of my pipeline. >> >> //STREAM + processing time. >> pipeline.apply(KafkaIO.read()) >> .apply(...) //mappers, a window and a combine >> >> .apply("WriteFiles", >> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites()) >> .getPerDestinationOutputFilenames() >> .apply("CombineFileNames", Combine.perKey(...)) >> >> Running the pipeline with Flink I have found a log trace that says data >> are >> discarded as late in the combine CombineFileNames. Then, I have added >> AllowedLateness to pipeline window strategy, as a workaround. >> It works by now, but this opens several questions to me >> >> I think the problem is getPerDestinationOutputFilenames generates files, >> but >> it does not maintain the windowing information. Then, CombineFileNames >> compares >> file names with the watermark of the pipeline and discards them as late. >> >> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am >> doing something wrong >> and using getPerDestinationOutputFilenames + combine does not make sense. >> What do you think? >> >> Please, note I am using Beam 2.17.0 with Flink 1.9.1. >> >> Many thanks, >> Jose >> >>
