This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3b1f9cba21 [spark] Support split-granularity bin packing for data
evolution tables (#8072)
3b1f9cba21 is described below
commit 3b1f9cba212476060467bbf83f094aba632c4cc7
Author: Weitai Li <[email protected]>
AuthorDate: Tue Jun 2 23:16:13 2026 +0800
[spark] Support split-granularity bin packing for data evolution tables
(#8072)
Support split-granularity bin packing when `data-evolution.enabled` is
enabled.
Data evolution splits must remain intact. Add split-granularity bin
packing to avoid reshuffling files across splits, while still grouping
whole splits by target size. Oversized splits are kept as-is.
---
.../paimon/spark/read/BinPackingSplits.scala | 43 ++++-
.../apache/paimon/spark/BinPackingSplitsTest.scala | 188 ++++++++++++++-------
2 files changed, 165 insertions(+), 66 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
index 24de25bce7..f2d06b45ea 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
@@ -78,14 +78,18 @@ case class BinPackingSplits(coreOptions: CoreOptions,
readRowSizeRatio: Double =
def pack(splits: Array[Split]): Seq[PaimonInputPartition] = {
val (toReshuffle, reserved) = splits.partition {
case _: FallbackSplit => false
- case split: DataSplit => split.rawConvertible()
+ case split: DataSplit => split.rawConvertible() ||
coreOptions.dataEvolutionEnabled()
// Currently, format table reader only supports reading one file.
case _: FormatDataSplit => false
case _ => false
}
if (toReshuffle.nonEmpty) {
val startTS = System.currentTimeMillis()
- val reshuffled = packDataSplit(toReshuffle.collect { case ds: DataSplit
=> ds })
+ val reshuffled = if (coreOptions.dataEvolutionEnabled()) {
+ packDataEvolutionSplit(toReshuffle.collect { case ds: DataSplit => ds
})
+ } else {
+ packDataSplit(toReshuffle.collect { case ds: DataSplit => ds })
+ }
val all = reserved.map(PaimonInputPartition.apply) ++ reshuffled
val duration = System.currentTimeMillis() - startTS
logInfo(
@@ -156,6 +160,41 @@ case class BinPackingSplits(coreOptions: CoreOptions,
readRowSizeRatio: Double =
partitions.toArray
}
+ private def packDataEvolutionSplit(splits: Array[DataSplit]):
Array[PaimonInputPartition] = {
+ val maxSplitBytes = computeMaxSplitBytes(splits)
+
+ var currentSize = 0L
+ val currentSplits = new ArrayBuffer[DataSplit]
+ val partitions = new ArrayBuffer[PaimonInputPartition]
+
+ def closeInputPartition(): Unit = {
+ if (currentSplits.nonEmpty) {
+ partitions += PaimonInputPartition(currentSplits.toArray)
+ currentSplits.clear()
+ currentSize = 0L
+ }
+ }
+
+ splits.foreach {
+ split =>
+ val ddFiles = dataFileAndDeletionFiles(split)
+ val size = ddFiles.map {
+ case (dataFile, deletionFile) =>
+ (dataFile.fileSize() * readRowSizeRatio).toLong + openCostInBytes
+ Option(deletionFile)
+ .map(_.length())
+ .getOrElse(0L)
+ }.sum
+ if (currentSplits.nonEmpty && currentSize + size > maxSplitBytes) {
+ closeInputPartition()
+ }
+ currentSplits += split
+ currentSize += size
+ }
+
+ closeInputPartition()
+ partitions.toArray
+ }
+
private def copyDataSplit(
split: DataSplit,
dataFiles: Seq[DataFileMeta],
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
index 2e2d311c85..9173681953 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.data.BinaryRow
import org.apache.paimon.io.DataFileMeta
import org.apache.paimon.manifest.FileSource
import org.apache.paimon.spark.read.BinPackingSplits
-import org.apache.paimon.table.source.{DataSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
import org.junit.jupiter.api.Assertions
@@ -35,43 +35,19 @@ import scala.collection.mutable
class BinPackingSplitsTest extends PaimonSparkTestBase {
test("Paimon: reshuffle splits") {
- withSparkSQLConf(("spark.sql.leafNodeDefaultParallelism", "20")) {
+ withSparkSQLConf("spark.sql.files.minPartitionNum" -> "20") {
val splitNum = 5
val fileNum = 100
val files = scala.collection.mutable.ListBuffer.empty[DataFileMeta]
- 0.until(fileNum).foreach {
- i =>
- val path = s"f$i.parquet"
- files += DataFileMeta.forAppend(
- path,
- 750000,
- 30000,
- null,
- 0,
- 29999,
- 1,
- new java.util.ArrayList[String](),
- null,
- FileSource.APPEND,
- null,
- null,
- null,
- null)
- }
+ 0.until(fileNum).foreach(i => files += newDataFile(s"f$i.parquet",
750000, 30000, 29999))
val dataSplits = mutable.ArrayBuffer.empty[Split]
0.until(splitNum).foreach {
i =>
- dataSplits += DataSplit
- .builder()
- .withSnapshot(1)
- .withBucket(0)
- .withPartition(BinaryRow.EMPTY_ROW)
- .withDataFiles(files.zipWithIndex.filter(_._2 % splitNum ==
i).map(_._1).toList.asJava)
- .rawConvertible(true)
- .withBucketPath("no use")
- .build()
+ dataSplits += newDataSplitFromFiles(
+ files.zipWithIndex.filter(_._2 % splitNum == i).map(_._1).toSeq,
+ rawConvertible = true)
}
val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
@@ -81,41 +57,69 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
}
test("Paimon: reshuffle one split") {
- val files = List(
- DataFileMeta.forAppend(
- "f1.parquet",
- 750000,
- 30000,
- null,
- 0,
- 29999,
- 1,
- new java.util.ArrayList[String](),
- null,
- FileSource.APPEND,
- null,
- null,
- null,
- null)
- ).asJava
-
- val dataSplits: Array[Split] = Array(
- DataSplit
- .builder()
- .withSnapshot(1)
- .withBucket(0)
- .withPartition(BinaryRow.EMPTY_ROW)
- .withDataFiles(files)
- .rawConvertible(true)
- .withBucketPath("no use")
- .build()
- )
+ val split = newDataSplitFromFiles(
+ Seq(newDataFile("f1.parquet", 750000, 30000, 29999)),
+ rawConvertible = true)
+
+ val dataSplits: Array[Split] = Array(split)
val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
val reshuffled = binPacking.pack(dataSplits)
Assertions.assertEquals(1, reshuffled.length)
}
+ test("Paimon: pack data evolution splits by split granularity") {
+ withSparkSQLConf("spark.sql.files.minPartitionNum" -> "1") {
+ val split1 = newDataSplit("split1", Seq(40L, 40L), deletionFileLength =
Some(5L))
+ val split2 = newDataSplit("split2", Seq(40L, 40L), deletionFileLength =
Some(5L))
+ val split3 = newDataSplit("split3", Seq(40L, 40L), deletionFileLength =
Some(5L))
+
+ val binPacking = BinPackingSplits(
+ CoreOptions.fromMap(
+ Map(
+ "data-evolution.enabled" -> "true",
+ "deletion-vectors.enabled" -> "true",
+ "source.split.open-file-cost" -> "5 B",
+ "source.split.target-size" -> "150 B").asJava),
+ readRowSizeRatio = 0.5
+ )
+ val reshuffled = binPacking.pack(Array[Split](split1, split2, split3))
+
+ // Each split size is 2 * (40 * 0.5 + 5 open cost + 5 deletion file
length) = 60.
+ // Therefore two whole splits fit into the 150 B target while three do
not.
+ Assertions.assertEquals(2, reshuffled.length)
+ Assertions.assertEquals(2, reshuffled.head.splits.length)
+ Assertions.assertSame(split1, reshuffled.head.splits.head)
+ Assertions.assertSame(split2, reshuffled.head.splits(1))
+ Assertions.assertEquals(1, reshuffled(1).splits.length)
+ Assertions.assertSame(split3, reshuffled(1).splits.head)
+ reshuffled.flatMap(_.splits).foreach {
+ split => Assertions.assertEquals(2,
split.asInstanceOf[DataSplit].dataFiles().size())
+ }
+ }
+ }
+
+ test("Paimon: data evolution split packing keeps oversized split whole") {
+ withSparkSQLConf("spark.sql.files.minPartitionNum" -> "1") {
+ val split = newDataSplit("oversized", Seq(40L, 40L))
+
+ val binPacking = BinPackingSplits(
+ CoreOptions.fromMap(
+ Map(
+ "data-evolution.enabled" -> "true",
+ "source.split.open-file-cost" -> "0 B",
+ "source.split.target-size" -> "50 B").asJava))
+ val reshuffled = binPacking.pack(Array[Split](split))
+
+ Assertions.assertEquals(1, reshuffled.length)
+ Assertions.assertEquals(1, reshuffled.head.splits.length)
+ Assertions.assertSame(split, reshuffled.head.splits.head)
+ Assertions.assertEquals(
+ 2,
+ reshuffled.head.splits.head.asInstanceOf[DataSplit].dataFiles().size())
+ }
+ }
+
test("Paimon: set open-file-cost to 0") {
withTable("t") {
sql("CREATE TABLE t (a INT, b STRING)")
@@ -126,21 +130,21 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
def paimonScan() = getPaimonScan("SELECT * FROM t")
// default openCostInBytes is 4m, so we will get 400 / 128 = 4 partitions
- withSparkSQLConf("spark.sql.leafNodeDefaultParallelism" -> "1") {
+ withSparkSQLConf("spark.sql.files.minPartitionNum" -> "1") {
assert(paimonScan().inputPartitions.length == 4)
}
withSparkSQLConf(
- "spark.sql.files.openCostInBytes" -> "0",
- "spark.sql.leafNodeDefaultParallelism" -> "1") {
+ "spark.sql.files.minPartitionNum" -> "1",
+ "spark.sql.files.openCostInBytes" -> "0") {
assert(paimonScan().inputPartitions.length == 1)
}
// Paimon's conf takes precedence over Spark's
withSparkSQLConf(
+ "spark.sql.files.minPartitionNum" -> "1",
"spark.sql.files.openCostInBytes" -> "4194304",
- "spark.paimon.source.split.open-file-cost" -> "0",
- "spark.sql.leafNodeDefaultParallelism" -> "1") {
+ "spark.paimon.source.split.open-file-cost" -> "0") {
assert(paimonScan().inputPartitions.length == 1)
}
}
@@ -176,4 +180,60 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
}
}
}
+
+ private def newDataSplit(
+ prefix: String,
+ fileSizes: Seq[Long],
+ rawConvertible: Boolean = false,
+ deletionFileLength: Option[Long] = None): DataSplit = {
+ val files = fileSizes.zipWithIndex.map {
+ case (fileSize, index) => newDataFile(s"$prefix-$index.parquet",
fileSize)
+ }
+ newDataSplitFromFiles(files, rawConvertible, deletionFileLength, prefix)
+ }
+
+ private def newDataSplitFromFiles(
+ files: Seq[DataFileMeta],
+ rawConvertible: Boolean,
+ deletionFileLength: Option[Long] = None,
+ deletionFilePrefix: String = "delete"): DataSplit = {
+ val builder = DataSplit
+ .builder()
+ .withSnapshot(1)
+ .withBucket(0)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withDataFiles(files.asJava)
+ .rawConvertible(rawConvertible)
+ .withBucketPath("no use")
+ deletionFileLength.foreach {
+ length =>
+ builder.withDataDeletionFiles(
+ files.indices
+ .map(index => new DeletionFile(s"$deletionFilePrefix-$index.dv",
0, length, null))
+ .asJava)
+ }
+ builder.build()
+ }
+
+ private def newDataFile(
+ fileName: String,
+ fileSize: Long,
+ rowCount: Long = 1,
+ maxSequenceNumber: Long = 0): DataFileMeta = {
+ DataFileMeta.forAppend(
+ fileName,
+ fileSize,
+ rowCount,
+ null,
+ 0,
+ maxSequenceNumber,
+ 1,
+ new java.util.ArrayList[String](),
+ null,
+ FileSource.APPEND,
+ null,
+ null,
+ null,
+ null)
+ }
}