[
https://issues.apache.org/jira/browse/BEAM-5865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720299#comment-16720299
]
Jozef Vilcek commented on BEAM-5865:
------------------------------------
Hello, I want to restate my problem here as it was raised in the discussion in
mailing list liked in jira description ( Unbalanced FileIO writes on Flink ).
I have a job reading event level data from kafka topic and building some
aggregates out of it which is doing fine. However, I need also to store all
event level data to HDFS for various reasons (reprocessing, audit, ...). It
make sense to me to do both things within the same single streaming job.
However, writes via FileIO end up being very badly distributed among flink
worker creating significant hotspots. As a result, when job has data spikes or
delays to process, it is quite slow to catch up. I noticed a lot of pushback
form some parts of processing graph.
Now, we know that this is because of GBK required by FileIO, to define
Iterable[Values] for FileIO.Sink to write and flush to single windowed file.
Flink use KeyedStream for this operation which uses hash partitioner and that
is known to suffer for cases where num_keys << num_parallelism.
What I am essentially doing is:
{noformat}
pipeline
.apply(readKafka)
.apply(MapElements.via(new SimpleFunction[KafkaRecord[Array[Byte],
Array[Byte]], String]() {
override def apply(input: KafkaRecord[Array[Byte], Array[Byte]]):
String = {
new String(input.getKV.getValue, "UTF-8")
}
}))
.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(maxOnTimeCount))
.withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
Repeatedly.forever(AfterPane.elementCountAtLeast(maxLateCount)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(7)))
.apply(FileIO.write()
.via(TextIO.sink())
.withNaming(new SafeFileNaming(outputPath, ".txt"))
.withTempDirectory(tempLocation)
.withNumShards(parallelism)){noformat}
I would like to know what options do I have to make this work fine.
As you can see I am not even interested in any specific key, just want to make
number of files manageable and segment data into files based on even time.
> Auto sharding of streaming sinks in FlinkRunner
> -----------------------------------------------
>
> Key: BEAM-5865
> URL: https://issues.apache.org/jira/browse/BEAM-5865
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Maximilian Michels
> Priority: Major
>
> The Flink Runner should do auto-sharding of streaming sinks, similar to
> BEAM-1438. That way, the user doesn't have to set shards manually which
> introduces additional shuffling and might cause skew in the distribution of
> data.
> As per discussion:
> https://lists.apache.org/thread.html/7b92145dd9ae68da1866f1047445479f51d31f103d6407316bb4114c@%3Cuser.beam.apache.org%3E
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)