[
https://issues.apache.org/jira/browse/HUDI-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vinoth Chandar reassigned HUDI-6019:
------------------------------------
Assignee: Vinoth Chandar
> Kafka source support split by count
> -----------------------------------
>
> Key: HUDI-6019
> URL: https://issues.apache.org/jira/browse/HUDI-6019
> Project: Apache Hudi
> Issue Type: New Feature
> Components: deltastreamer, hudi-utilities
> Reporter: Kong Wei
> Assignee: Vinoth Chandar
> Priority: Major
> Labels: pull-request-available
>
> For the kafka source, when pulling data from kafka, the default parallelism
> is the number of kafka partitions, and the only way to increase the
> parallelism (to speed up) is to add more kafka partitions.
> There are cases:
> # Pulling large amount of data from kafka (eg. maxEvents=100000000), but the
> # of kafka partition is not enough, the procedure of the pulling will cost
> too much of time, even worse can cause the executor OOM
> # There is huge data skew between kafka partitions, the procedure of the
> pulling will be blocked by the slowest partition
> to solve those cases, I want to add a parameter
> {{*hoodie.deltastreamer.source.kafka.per.partition.maxEvents*}} to control
> the maxEvents in one kafka partition, default Long.MAX_VALUE means not trun
> this feature on.
>
> For example, given hoodie.deltastreamer.kafka.source.maxEvents=10000000, 2
> kafka partitions:
> the best case is pulling 5000000 events from each kafka partition, which may
> take minutes to finish;
> while worse case may be pulling 9000000 event from one partition, and pulling
> 1000000 events from another one, which will take more time to finish due to
> data skew.
>
> In this example, we set
> {{hoodie.deltastreamer.source.kafka.per.partition.maxEvents=1000000, then we
> will split the kafka source into at least 10 parts, each executor will
> pulling at most 1000000 events from kafka, which will take the advantage of
> parallelism.}}
> {{}}
> {{}}
> {{**}}
> 3 benefits of this feature:
> # Avoid a single executor pulling a large amount of data and taking too long
> ({*}avoid data skew{*})
> # Avoid a single executor pulling a large amount of data, use too much
> memory or even OOM ({*}avoid OOM{*})
> # A single executor pulls a small amount of data, which can make full use of
> the number of cores to improve concurrency, then reduce the time of the
> pulling procedure ({*}increase parallelism{*})
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)