[
https://issues.apache.org/jira/browse/SPARK-36576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim resolved SPARK-36576.
----------------------------------
Fix Version/s: 3.3.0
Resolution: Fixed
Issue resolved by pull request 33827
[https://github.com/apache/spark/pull/33827]
> Improve range split calculation for Kafka Source minPartitions option
> ---------------------------------------------------------------------
>
> Key: SPARK-36576
> URL: https://issues.apache.org/jira/browse/SPARK-36576
> Project: Spark
> Issue Type: Improvement
> Components: Structured Streaming
> Affects Versions: 3.1.2
> Reporter: Andrew Olson
> Assignee: Andrew Olson
> Priority: Minor
> Fix For: 3.3.0
>
>
> While the
> [documentation|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]
> does contain a clear disclaimer,
> {quote}Please note that this configuration is like a {{hint}}: the number of
> Spark tasks will be *approximately* {{minPartitions}}. It can be less or more
> depending on rounding errors or Kafka partitions that didn't receive any new
> data.
> {quote}
> there are cases where the calculated Kafka partition range splits can differ
> greatly from expectations. For evenly distributed data and most
> {{minPartitions}} values this would not be a major or commonly encountered
> concern. However when the distribution of data across partitions is very
> heavily skewed, somewhat surprising range split calculations can result.
> For example, given the following input data:
> * 1 partition containing 10,000 messages
> * 1,000 partitions each containing 1 message
> Spark processing code loading from this collection of 1,001 partitions may
> decide that it would like each task to read no more than 1,000 messages.
> Consequently, it could specify a {{minPartitions}} value of 1,010 - expecting
> the single large partition to be split into 10 equal chunks, along with the
> 1,000 small partitions each having their own task. That is far from what
> actually occurs. The {{KafkaOffsetRangeCalculator}} algorithm ends up
> splitting the large partition into 918 chunks of 10 or 11 messages, two
> orders of magnitude from the desired maximum message count per task and
> nearly double the number of Spark tasks hinted in the configuration.
> Proposing that the {{KafkaOffsetRangeCalculator}}'s range calculation logic
> be modified to exclude small (i.e. un-split) partitions from the overall
> proportional distribution math, in order to more reasonably divide the large
> partitions when they are accompanied by many small partitions, and to provide
> optimal behavior for cases where a {{minPartitions}} value is deliberately
> computed based on the volume of data being read.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]