[
https://issues.apache.org/jira/browse/HUDI-6019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kong Wei updated HUDI-6019:
---------------------------
Description:
For the kafka source, when pulling data from kafka, the default parallelism is
the number of kafka partitions, and the only way to increase the parallelism
(to speed up) is to add more kafka partitions.
There are cases:
# Pulling large amount of data from kafka (eg. maxEvents=100000000), but the #
of kafka partition is not enough, the procedure of the pulling will cost too
much of time, even worse can cause the executor OOM
# There is huge data skew between kafka partitions, the procedure of the
pulling will be blocked by the slowest partition
to solve those cases, I want to add a parameter
{{*hoodie.deltastreamer.source.kafka.per.partition.maxEvents*}} to control the
maxEvents in one kafka partition, default Long.MAX_VALUE means not trun this
feature on.
For example, given hoodie.deltastreamer.kafka.source.maxEvents=10000000, 2
kafka partitions:
the best case is pulling 5000000 events from each kafka partition, which may
take minutes to finish;
while worse case may be pulling 9000000 event from one partition, and pulling
1000000 events from another one, which will take more time to finish due to
data skew.
In this example, we set
{{hoodie.deltastreamer.source.kafka.per.partition.maxEvents=1000000, then we
will split the kafka source into at least 10 parts, each executor will pulling
at most 1000000 events from kafka, which will take the advantage of
parallelism.}}
{{}}
{{}}
{{**}}
3 benefits of this feature:
# Avoid a single executor pulling a large amount of data and taking too long
({*}avoid data skew{*})
# Avoid a single executor pulling a large amount of data, use too much memory
or even OOM ({*}avoid OOM{*})
# A single executor pulls a small amount of data, which can make full use of
the number of cores to improve concurrency, then reduce the time of the pulling
procedure ({*}increase parallelism{*})
was:
For the kafka source, when pulling data from kafka, the default parallelism is
the number of kafka partitions, and the only way to increase the parallelism
(to speed up) is to add more kafka partitions.
There are cases:
# Pulling large amount of data from kafka (eg. maxEvents=100000000), but the #
of kafka partition is not enough, the procedure of the pulling will cost too
much of time, even worse can cause the executor OOM
# There is huge data skew between kafka partitions, the procedure of the
pulling will be blocked by the slowest partition
to solve those cases, I want to add a parameter
{{*hoodie.deltastreamer.kafka.per.partition.maxEvents*}} to control the
maxEvents in one kafka partition, default Long.MAX_VALUE means not trun this
feature on.
For example, given hoodie.deltastreamer.kafka.source.maxEvents=10000000, 2
kafka partitions:
the best case is pulling 5000000 events from each kafka partition, which may
take minutes to finish;
while worse case may be pulling 9000000 event from one partition, and pulling
1000000 events from another one, which will take more time to finish due to
data skew.
In this example, we set
{{hoodie.deltastreamer.kafka.per.partition.maxEvents=1000000, then we will
split the kafka source into at least 10 parts, each executor will pulling at
most 1000000 events from kafka, which will take the advantage of parallelism.}}
{{}}
{{}}
{{**}}
3 benefits of this feature:
# Avoid a single executor pulling a large amount of data and taking too long
({*}avoid data skew{*})
# Avoid a single executor pulling a large amount of data, use too much memory
or even OOM ({*}avoid OOM{*})
# A single executor pulls a small amount of data, which can make full use of
the number of cores to improve concurrency, then reduce the time of the pulling
procedure ({*}increase parallelism{*})
> Kafka source support split by count
> -----------------------------------
>
> Key: HUDI-6019
> URL: https://issues.apache.org/jira/browse/HUDI-6019
> Project: Apache Hudi
> Issue Type: New Feature
> Components: deltastreamer, hudi-utilities
> Reporter: Kong Wei
> Priority: Major
> Labels: pull-request-available
>
> For the kafka source, when pulling data from kafka, the default parallelism
> is the number of kafka partitions, and the only way to increase the
> parallelism (to speed up) is to add more kafka partitions.
> There are cases:
> # Pulling large amount of data from kafka (eg. maxEvents=100000000), but the
> # of kafka partition is not enough, the procedure of the pulling will cost
> too much of time, even worse can cause the executor OOM
> # There is huge data skew between kafka partitions, the procedure of the
> pulling will be blocked by the slowest partition
> to solve those cases, I want to add a parameter
> {{*hoodie.deltastreamer.source.kafka.per.partition.maxEvents*}} to control
> the maxEvents in one kafka partition, default Long.MAX_VALUE means not trun
> this feature on.
>
> For example, given hoodie.deltastreamer.kafka.source.maxEvents=10000000, 2
> kafka partitions:
> the best case is pulling 5000000 events from each kafka partition, which may
> take minutes to finish;
> while worse case may be pulling 9000000 event from one partition, and pulling
> 1000000 events from another one, which will take more time to finish due to
> data skew.
>
> In this example, we set
> {{hoodie.deltastreamer.source.kafka.per.partition.maxEvents=1000000, then we
> will split the kafka source into at least 10 parts, each executor will
> pulling at most 1000000 events from kafka, which will take the advantage of
> parallelism.}}
> {{}}
> {{}}
> {{**}}
> 3 benefits of this feature:
> # Avoid a single executor pulling a large amount of data and taking too long
> ({*}avoid data skew{*})
> # Avoid a single executor pulling a large amount of data, use too much
> memory or even OOM ({*}avoid OOM{*})
> # A single executor pulls a small amount of data, which can make full use of
> the number of cores to improve concurrency, then reduce the time of the
> pulling procedure ({*}increase parallelism{*})
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)