Hi Enrico,

Thanks for opening the discussion!

One thing to note that may help s that the hadoop S3 FS tries to
imitate a filesystem on top of S3:

- before writing a key it checks if the "parent directory" exists by
checking for a key with the prefix up to the last "/"
- it creates empty marker files to mark the existence of such a parent
directory - all these "existence" requests are S3 HEAD requests which
are expensive.
As a result the hadoop S3 FS has very high "create file" latency and
it hits request rate limits very quickly.

This may not play well with your directory structure. The fact that it
works well when messages for a specific dir come in bursts may be
explained
by some form of internal caching done by hadoop but I am not sure.

In any case, it may be also helpful to post your findings to the
hadoop community as well to see if they have any answers.

Finally, two recommendations:
 1) try using the presto-S3 instead of hadoop, as presto seems to be
more efficient on that regard, and please report if you notice any
changes
 2) try to move to the StreamingFileSink as the BucketingSink is
already deprecated (although presto-s3 is not supported by the
StreamingFileSink)

Cheers,
Kostas

On Tue, Oct 15, 2019 at 3:47 PM Enrico Agnoli <enrico.agn...@workday.com> wrote:
>
> Starting here the discussion after an initial discussion with Ververica and 
> AWS teams during FlinkForward.
> I'm investigating the performances of a Flink job that transports data from 
> Kafka to an S3 Sink.
> We are using a BucketingSink to write parquet files. The bucketing logic 
> divides the messages having a folder per type of data, tenant (customer), 
> date-time, extraction Id, etc etc. This results in each file is stored in a 
> folder structure composed by 9-10 layers 
> (s3_bucket:/1/2/3/4/5/6/7/8/9/myFile...)
>
> If the data is distributed as bursts of messages for tenant-type we see good 
> performances in writing, but when the data is more a white noise distribution 
> on thousands of tenants, dozens of data types and multiple extraction IDs, we 
> have an incredible loss of performances. (in the order of 300x times)
>
> Attaching a debugger, it seems the issue is connected to the number of 
> handlers open at the same time on S3 to write data. More specifically
> https://jira2.workday.com/secure/attachment/2947228/2947228_image-2019-06-23-22-46-43-980.png
>
> Researching in the hadoop libraries used to write to S3 I have found some 
> possible improvements setting:
>       <name>fs.s3a.connection.maximum</name>
>       <name>fs.s3a.threads.max</name>
>       <name>fs.s3a.threads.core</name>
>       <name>fs.s3a.max.total.tasks</name>
> But none of these made a big difference in throughput.
>
> I hope to bring ahead the discussion and see if we can find a clear issue in 
> the logic or possible work-around.
>
> Note: The tests have been done on Flink 1.8 with the Hadoop FileSystem 
> (BucketingSink)
>

Reply via email to