[ 
https://issues.apache.org/jira/browse/BEAM-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720299#comment-16720299
 ] 

Jozef Vilcek commented on BEAM-5865:
------------------------------------

Hello, I want to restate my problem here as it was raised in the discussion in 
mailing list liked in jira description ( Unbalanced FileIO writes on Flink ).

I have a job reading event level data from kafka topic and building some 
aggregates out of it which is doing fine. However, I need also to store all 
event level data to HDFS for various reasons (reprocessing, audit, ...). It 
make sense to me to do both things within the same single streaming job. 
However, writes via FileIO end up being very badly distributed among flink 
worker creating significant hotspots. As a result, when job has data spikes or 
delays to process, it is quite slow to catch up. I noticed a lot of pushback 
form some parts of processing graph.

Now, we know that this is because of GBK required by FileIO, to define 
Iterable[Values] for FileIO.Sink to write and flush to single windowed file. 
Flink use KeyedStream for this operation which uses hash partitioner and that 
is known to suffer for cases where num_keys << num_parallelism.

What I am essentially doing is:
{noformat}
pipeline
        .apply(readKafka)
        .apply(MapElements.via(new SimpleFunction[KafkaRecord[Array[Byte], 
Array[Byte]], String]() {
          override def apply(input: KafkaRecord[Array[Byte], Array[Byte]]): 
String = {
            new String(input.getKV.getValue, "UTF-8")
          }
        }))

        .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterPane.elementCountAtLeast(maxOnTimeCount))
                .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
                  
Repeatedly.forever(AfterPane.elementCountAtLeast(maxLateCount)),
                  
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.standardDays(7)))

        .apply(FileIO.write()
            .via(TextIO.sink())
            .withNaming(new SafeFileNaming(outputPath, ".txt"))
            .withTempDirectory(tempLocation)
            .withNumShards(parallelism)){noformat}
I would like to know what options do I have to make this work fine.

As you can see I am not even interested in any specific key, just want to make 
number of files manageable and segment data into files based on even time.

 

> Auto sharding of streaming sinks in FlinkRunner
> -----------------------------------------------
>
>                 Key: BEAM-5865
>                 URL: https://issues.apache.org/jira/browse/BEAM-5865
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Priority: Major
>
> The Flink Runner should do auto-sharding of streaming sinks, similar to 
> BEAM-1438. That way, the user doesn't have to set shards manually which 
> introduces additional shuffling and might cause skew in the distribution of 
> data.
> As per discussion: 
> https://lists.apache.org/thread.html/7b92145dd9ae68da1866f1047445479f51d31f103d6407316bb4114c@%3Cuser.beam.apache.org%3E



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to