This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7d1be37 [SPARK-36576][SS] Improve range split calculation for Kafka
Source minPartitions option
7d1be37 is described below
commit 7d1be3710dddd446c23606c3871e28d211ad9776
Author: Andrew Olson <[email protected]>
AuthorDate: Sun Aug 29 16:38:29 2021 +0900
[SPARK-36576][SS] Improve range split calculation for Kafka Source
minPartitions option
### What changes were proposed in this pull request?
Proposing that the `KafkaOffsetRangeCalculator`'s range calculation logic
be modified to exclude small (i.e. un-split) partitions from the overall
proportional distribution math, in order to more reasonably divide the large
partitions when they are accompanied by many small partitions, and to provide
optimal behavior for cases where a `minPartitions` value is deliberately
computed based on the volume of data being read.
### Why are the changes needed?
While the
[documentation](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html)
does contain a clear disclaimer,
> Please note that this configuration is like a hint: the number of Spark
tasks will be **approximately** `minPartitions`. It can be less or more
depending on rounding errors or Kafka partitions that didn't receive any new
data.
there are cases where the calculated Kafka partition range splits can
differ greatly from expectations. For evenly distributed data and most
`minPartitions `values this would not be a major or commonly encountered
concern. However when the distribution of data across partitions is very
heavily skewed, somewhat surprising range split calculations can result.
For example, given the following input data:
- 1 partition containing 10,000 messages
- 1,000 partitions each containing 1 message
Spark processing code loading from this collection of 1,001 partitions may
decide that it would like each task to read no more than 1,000 messages.
Consequently, it could specify a `minPartitions` value of 1,010 — expecting the
single large partition to be split into 10 equal chunks, along with the 1,000
small partitions each having their own task. That is far from what actually
occurs. The `KafkaOffsetRangeCalculator` algorithm ends up splitting the large
partition into 918 chunks of [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests and added new unit tests
Closes #33827 from noslowerdna/SPARK-36576.
Authored-by: Andrew Olson <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/kafka010/KafkaOffsetRangeCalculator.scala | 31 ++++++++++--
.../kafka010/KafkaOffsetRangeCalculatorSuite.scala | 58 ++++++++++++++++++++--
.../sql/kafka010/KafkaOffsetReaderSuite.scala | 4 +-
3 files changed, 84 insertions(+), 9 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
index 1e9a62e..4c0620a 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala
@@ -33,12 +33,13 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
* Calculate the offset ranges that we are going to process this batch. If
`minPartitions`
* is not set or is set less than or equal the number of `topicPartitions`
that we're going to
* consume, then we fall back to a 1-1 mapping of Spark tasks to Kafka
partitions. If
- * `numPartitions` is set higher than the number of our `topicPartitions`,
then we will split up
+ * `minPartitions` is set higher than the number of our `topicPartitions`,
then we will split up
* the read tasks of the skewed partitions to multiple Spark tasks.
- * The number of Spark tasks will be *approximately* `numPartitions`. It can
be less or more
+ * The number of Spark tasks will be *approximately* `minPartitions`. It can
be less or more
* depending on rounding errors or Kafka partitions that didn't receive any
new data.
*
- * Empty ranges (`KafkaOffsetRange.size <= 0`) will be dropped.
+ * Empty (`KafkaOffsetRange.size == 0`) or invalid (`KafkaOffsetRange.size <
0`) ranges will be
+ * dropped.
*/
def getRanges(
ranges: Seq[KafkaOffsetRange],
@@ -56,11 +57,29 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
// Splits offset ranges with relatively large amount of data to smaller
ones.
val totalSize = offsetRanges.map(_.size).sum
+
+ // First distinguish between any small (i.e. unsplit) ranges and large
(i.e. split) ranges,
+ // in order to exclude the contents of unsplit ranges from the
proportional math applied to
+ // split ranges
+ val unsplitRanges = offsetRanges.filter { range =>
+ getPartCount(range.size, totalSize, minPartitions.get) == 1
+ }
+
+ val unsplitRangeTotalSize = unsplitRanges.map(_.size).sum
+ val splitRangeTotalSize = totalSize - unsplitRangeTotalSize
+ val unsplitRangeTopicPartitions =
unsplitRanges.map(_.topicPartition).toSet
+ val splitRangeMinPartitions = math.max(minPartitions.get -
unsplitRanges.size, 1)
+
+ // Now we can apply the main calculation logic
offsetRanges.flatMap { range =>
val tp = range.topicPartition
val size = range.size
// number of partitions to divvy up this topic partition to
- val parts = math.max(math.round(size.toDouble / totalSize *
minPartitions.get), 1).toInt
+ val parts = if (unsplitRangeTopicPartitions.contains(tp)) {
+ 1
+ } else {
+ getPartCount(size, splitRangeTotalSize, splitRangeMinPartitions)
+ }
var remaining = size
var startOffset = range.fromOffset
(0 until parts).map { part =>
@@ -76,6 +95,10 @@ private[kafka010] class KafkaOffsetRangeCalculator(val
minPartitions: Option[Int
}
}
+ private def getPartCount(size: Long, totalSize: Long, minParts: Int): Int = {
+ math.max(math.round(size.toDouble / totalSize * minParts), 1).toInt
+ }
+
private def getLocation(tp: TopicPartition, executorLocations: Seq[String]):
Option[String] = {
def floorMod(a: Long, b: Int): Int = ((a % b).toInt + b) % b
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
index 751b877..4ef019c 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
@@ -106,7 +106,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite
{
KafkaOffsetRange(tp1, 4, 5, None))) // location pref not set when
minPartition is set
}
- testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 3) {
calc =>
+ testWithMinPartitions("N skewed TopicPartitions to M offset ranges", 4) {
calc =>
assert(
calc.getRanges(
Seq(
@@ -134,7 +134,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite
{
testWithMinPartitions(
"SPARK-30656: N very skewed TopicPartitions to M offset ranges",
- 3) { calc =>
+ 4) { calc =>
assert(
calc.getRanges(
Seq(
@@ -170,7 +170,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite
{
KafkaOffsetRange(tp1, 7, 11, None)))
}
- testWithMinPartitions("empty ranges ignored", 3) { calc =>
+ testWithMinPartitions("empty ranges ignored", 4) { calc =>
assert(
calc.getRanges(
Seq(
@@ -201,6 +201,58 @@ class KafkaOffsetRangeCalculatorSuite extends
SparkFunSuite {
KafkaOffsetRange(tp3, 0, 1, None)))
}
+ testWithMinPartitions(
+ "SPARK-36576: 0 small unsplit ranges and 3 large split ranges", 9) { calc
=>
+ assert(
+ calc.getRanges(
+ Seq(
+ KafkaOffsetRange(tp1, 0, 10000),
+ KafkaOffsetRange(tp2, 0, 15000),
+ KafkaOffsetRange(tp3, 0, 20000))) ===
+ Seq(
+ KafkaOffsetRange(tp1, 0, 5000, None),
+ KafkaOffsetRange(tp1, 5000, 10000, None),
+ KafkaOffsetRange(tp2, 0, 5000, None),
+ KafkaOffsetRange(tp2, 5000, 10000, None),
+ KafkaOffsetRange(tp2, 10000, 15000, None),
+ KafkaOffsetRange(tp3, 0, 5000, None),
+ KafkaOffsetRange(tp3, 5000, 10000, None),
+ KafkaOffsetRange(tp3, 10000, 15000, None),
+ KafkaOffsetRange(tp3, 15000, 20000, None)))
+ }
+
+ testWithMinPartitions("SPARK-36576: 1 small unsplit range and 2 large split
ranges", 6) { calc =>
+ assert(
+ calc.getRanges(
+ Seq(
+ KafkaOffsetRange(tp1, 0, 500),
+ KafkaOffsetRange(tp2, 0, 12000),
+ KafkaOffsetRange(tp3, 0, 15001))) ===
+ Seq(
+ KafkaOffsetRange(tp1, 0, 500, None),
+ KafkaOffsetRange(tp2, 0, 6000, None),
+ KafkaOffsetRange(tp2, 6000, 12000, None),
+ KafkaOffsetRange(tp3, 0, 5000, None),
+ KafkaOffsetRange(tp3, 5000, 10000, None),
+ KafkaOffsetRange(tp3, 10000, 15001, None)))
+ }
+
+ testWithMinPartitions("SPARK-36576: 2 small unsplit ranges and 1 large split
range", 6) { calc =>
+ assert(
+ calc.getRanges(
+ Seq(
+ KafkaOffsetRange(tp1, 0, 1),
+ KafkaOffsetRange(tp2, 0, 1),
+ KafkaOffsetRange(tp3, 0, 10000))) ===
+ Seq(
+ KafkaOffsetRange(tp1, 0, 1, None),
+ KafkaOffsetRange(tp2, 0, 1, None),
+ KafkaOffsetRange(tp3, 0, 2500, None),
+ KafkaOffsetRange(tp3, 2500, 5000, None),
+ KafkaOffsetRange(tp3, 5000, 7500, None),
+ KafkaOffsetRange(tp3, 7500, 10000, None)))
+ }
+
private val tp1 = new TopicPartition("t1", 1)
private val tp2 = new TopicPartition("t2", 1)
private val tp3 = new TopicPartition("t3", 1)
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
index d1e49b0..332db54 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderSuite.scala
@@ -143,7 +143,7 @@ class KafkaOffsetReaderSuite extends QueryTest with
SharedSparkSession with Kafk
testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
- val reader = createKafkaReader(topic, minPartitions = Some(3))
+ val reader = createKafkaReader(topic, minPartitions = Some(4))
val startingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> EARLIEST, tp2 ->
EARLIEST))
val endingOffsets = SpecificOffsetRangeLimit(Map(tp1 -> LATEST, tp2 -> 3))
@@ -163,7 +163,7 @@ class KafkaOffsetReaderSuite extends QueryTest with
SharedSparkSession with Kafk
testUtils.sendMessages(topic, (0 until 4).map(_.toString).toArray, Some(1))
val tp1 = new TopicPartition(topic, 0)
val tp2 = new TopicPartition(topic, 1)
- val reader = createKafkaReader(topic, minPartitions = Some(3))
+ val reader = createKafkaReader(topic, minPartitions = Some(4))
val fromPartitionOffsets = Map(tp1 -> 0L, tp2 -> 0L)
val untilPartitionOffsets = Map(tp1 -> 100L, tp2 -> 3L)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]