This is strange - AFAICT the windowing information should not be dropped.

Is it possible that the Flink runner isn't handling the WriteFilesResult
object (POutput value that embeds a PCollection) properly? Any way to test
this with another runner?

Reuven

On Thu, May 7, 2020 at 3:45 PM Luke Cwik <[email protected]> wrote:

> +dev <[email protected]>
>
> On Mon, May 4, 2020 at 3:56 AM Jose Manuel <[email protected]> wrote:
>
>> Hi guys,
>>
>> I think I have found something interesting about windowing.
>>
>> I have a pipeline that gets data from Kafka and writes in HDFS by means
>> of TextIO.
>> Once written, generated files are combined to apply some custom
>> operations.
>> However, the combine does not receive data. Following, you can find the
>> highlight of my pipeline.
>>
>> //STREAM + processing time.
>> pipeline.apply(KafkaIO.read())
>>         .apply(...) //mappers, a window and a combine
>>
>>         .apply("WriteFiles",
>> TextIO.<String>writeCustomType().to(policy).withShards(4).withWindowedWrites())
>>         .getPerDestinationOutputFilenames()
>>         .apply("CombineFileNames", Combine.perKey(...))
>>
>> Running the pipeline with Flink I have found a log trace that says data
>> are
>> discarded as late in the combine CombineFileNames. Then, I have added
>> AllowedLateness to pipeline window strategy, as a workaround.
>> It works by now, but this opens several questions to me
>>
>> I think the problem is getPerDestinationOutputFilenames generates files,
>> but
>> it does not maintain the windowing information. Then, CombineFileNames
>> compares
>> file names with the watermark of the pipeline and discards them as late.
>>
>> Is there any issue with getPerDestinationOutputFilenames? Maybe, I am
>> doing something wrong
>> and using getPerDestinationOutputFilenames + combine does not make sense.
>> What do you think?
>>
>> Please, note I am using Beam 2.17.0 with Flink 1.9.1.
>>
>> Many thanks,
>> Jose
>>
>>

Reply via email to