This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b1f9cba21 [spark] Support split-granularity bin packing for data 
evolution tables (#8072)
3b1f9cba21 is described below

commit 3b1f9cba212476060467bbf83f094aba632c4cc7
Author: Weitai Li <[email protected]>
AuthorDate: Tue Jun 2 23:16:13 2026 +0800

    [spark] Support split-granularity bin packing for data evolution tables 
(#8072)
    
    Support split-granularity bin packing when `data-evolution.enabled` is
    enabled.
    
    Data evolution splits must remain intact. Add split-granularity bin
    packing to avoid reshuffling files across splits, while still grouping
    whole splits by target size. Oversized splits are kept as-is.
---
 .../paimon/spark/read/BinPackingSplits.scala       |  43 ++++-
 .../apache/paimon/spark/BinPackingSplitsTest.scala | 188 ++++++++++++++-------
 2 files changed, 165 insertions(+), 66 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
index 24de25bce7..f2d06b45ea 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/read/BinPackingSplits.scala
@@ -78,14 +78,18 @@ case class BinPackingSplits(coreOptions: CoreOptions, 
readRowSizeRatio: Double =
   def pack(splits: Array[Split]): Seq[PaimonInputPartition] = {
     val (toReshuffle, reserved) = splits.partition {
       case _: FallbackSplit => false
-      case split: DataSplit => split.rawConvertible()
+      case split: DataSplit => split.rawConvertible() || 
coreOptions.dataEvolutionEnabled()
       // Currently, format table reader only supports reading one file.
       case _: FormatDataSplit => false
       case _ => false
     }
     if (toReshuffle.nonEmpty) {
       val startTS = System.currentTimeMillis()
-      val reshuffled = packDataSplit(toReshuffle.collect { case ds: DataSplit 
=> ds })
+      val reshuffled = if (coreOptions.dataEvolutionEnabled()) {
+        packDataEvolutionSplit(toReshuffle.collect { case ds: DataSplit => ds 
})
+      } else {
+        packDataSplit(toReshuffle.collect { case ds: DataSplit => ds })
+      }
       val all = reserved.map(PaimonInputPartition.apply) ++ reshuffled
       val duration = System.currentTimeMillis() - startTS
       logInfo(
@@ -156,6 +160,41 @@ case class BinPackingSplits(coreOptions: CoreOptions, 
readRowSizeRatio: Double =
     partitions.toArray
   }
 
+  private def packDataEvolutionSplit(splits: Array[DataSplit]): 
Array[PaimonInputPartition] = {
+    val maxSplitBytes = computeMaxSplitBytes(splits)
+
+    var currentSize = 0L
+    val currentSplits = new ArrayBuffer[DataSplit]
+    val partitions = new ArrayBuffer[PaimonInputPartition]
+
+    def closeInputPartition(): Unit = {
+      if (currentSplits.nonEmpty) {
+        partitions += PaimonInputPartition(currentSplits.toArray)
+        currentSplits.clear()
+        currentSize = 0L
+      }
+    }
+
+    splits.foreach {
+      split =>
+        val ddFiles = dataFileAndDeletionFiles(split)
+        val size = ddFiles.map {
+          case (dataFile, deletionFile) =>
+            (dataFile.fileSize() * readRowSizeRatio).toLong + openCostInBytes 
+ Option(deletionFile)
+              .map(_.length())
+              .getOrElse(0L)
+        }.sum
+        if (currentSplits.nonEmpty && currentSize + size > maxSplitBytes) {
+          closeInputPartition()
+        }
+        currentSplits += split
+        currentSize += size
+    }
+
+    closeInputPartition()
+    partitions.toArray
+  }
+
   private def copyDataSplit(
       split: DataSplit,
       dataFiles: Seq[DataFileMeta],
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
index 2e2d311c85..9173681953 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.io.DataFileMeta
 import org.apache.paimon.manifest.FileSource
 import org.apache.paimon.spark.read.BinPackingSplits
-import org.apache.paimon.table.source.{DataSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
 import org.junit.jupiter.api.Assertions
 
@@ -35,43 +35,19 @@ import scala.collection.mutable
 class BinPackingSplitsTest extends PaimonSparkTestBase {
 
   test("Paimon: reshuffle splits") {
-    withSparkSQLConf(("spark.sql.leafNodeDefaultParallelism", "20")) {
+    withSparkSQLConf("spark.sql.files.minPartitionNum" -> "20") {
       val splitNum = 5
       val fileNum = 100
 
       val files = scala.collection.mutable.ListBuffer.empty[DataFileMeta]
-      0.until(fileNum).foreach {
-        i =>
-          val path = s"f$i.parquet"
-          files += DataFileMeta.forAppend(
-            path,
-            750000,
-            30000,
-            null,
-            0,
-            29999,
-            1,
-            new java.util.ArrayList[String](),
-            null,
-            FileSource.APPEND,
-            null,
-            null,
-            null,
-            null)
-      }
+      0.until(fileNum).foreach(i => files += newDataFile(s"f$i.parquet", 
750000, 30000, 29999))
 
       val dataSplits = mutable.ArrayBuffer.empty[Split]
       0.until(splitNum).foreach {
         i =>
-          dataSplits += DataSplit
-            .builder()
-            .withSnapshot(1)
-            .withBucket(0)
-            .withPartition(BinaryRow.EMPTY_ROW)
-            .withDataFiles(files.zipWithIndex.filter(_._2 % splitNum == 
i).map(_._1).toList.asJava)
-            .rawConvertible(true)
-            .withBucketPath("no use")
-            .build()
+          dataSplits += newDataSplitFromFiles(
+            files.zipWithIndex.filter(_._2 % splitNum == i).map(_._1).toSeq,
+            rawConvertible = true)
       }
 
       val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
@@ -81,41 +57,69 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
   }
 
   test("Paimon: reshuffle one split") {
-    val files = List(
-      DataFileMeta.forAppend(
-        "f1.parquet",
-        750000,
-        30000,
-        null,
-        0,
-        29999,
-        1,
-        new java.util.ArrayList[String](),
-        null,
-        FileSource.APPEND,
-        null,
-        null,
-        null,
-        null)
-    ).asJava
-
-    val dataSplits: Array[Split] = Array(
-      DataSplit
-        .builder()
-        .withSnapshot(1)
-        .withBucket(0)
-        .withPartition(BinaryRow.EMPTY_ROW)
-        .withDataFiles(files)
-        .rawConvertible(true)
-        .withBucketPath("no use")
-        .build()
-    )
+    val split = newDataSplitFromFiles(
+      Seq(newDataFile("f1.parquet", 750000, 30000, 29999)),
+      rawConvertible = true)
+
+    val dataSplits: Array[Split] = Array(split)
 
     val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
     val reshuffled = binPacking.pack(dataSplits)
     Assertions.assertEquals(1, reshuffled.length)
   }
 
+  test("Paimon: pack data evolution splits by split granularity") {
+    withSparkSQLConf("spark.sql.files.minPartitionNum" -> "1") {
+      val split1 = newDataSplit("split1", Seq(40L, 40L), deletionFileLength = 
Some(5L))
+      val split2 = newDataSplit("split2", Seq(40L, 40L), deletionFileLength = 
Some(5L))
+      val split3 = newDataSplit("split3", Seq(40L, 40L), deletionFileLength = 
Some(5L))
+
+      val binPacking = BinPackingSplits(
+        CoreOptions.fromMap(
+          Map(
+            "data-evolution.enabled" -> "true",
+            "deletion-vectors.enabled" -> "true",
+            "source.split.open-file-cost" -> "5 B",
+            "source.split.target-size" -> "150 B").asJava),
+        readRowSizeRatio = 0.5
+      )
+      val reshuffled = binPacking.pack(Array[Split](split1, split2, split3))
+
+      // Each split size is 2 * (40 * 0.5 + 5 open cost + 5 deletion file 
length) = 60.
+      // Therefore two whole splits fit into the 150 B target while three do 
not.
+      Assertions.assertEquals(2, reshuffled.length)
+      Assertions.assertEquals(2, reshuffled.head.splits.length)
+      Assertions.assertSame(split1, reshuffled.head.splits.head)
+      Assertions.assertSame(split2, reshuffled.head.splits(1))
+      Assertions.assertEquals(1, reshuffled(1).splits.length)
+      Assertions.assertSame(split3, reshuffled(1).splits.head)
+      reshuffled.flatMap(_.splits).foreach {
+        split => Assertions.assertEquals(2, 
split.asInstanceOf[DataSplit].dataFiles().size())
+      }
+    }
+  }
+
+  test("Paimon: data evolution split packing keeps oversized split whole") {
+    withSparkSQLConf("spark.sql.files.minPartitionNum" -> "1") {
+      val split = newDataSplit("oversized", Seq(40L, 40L))
+
+      val binPacking = BinPackingSplits(
+        CoreOptions.fromMap(
+          Map(
+            "data-evolution.enabled" -> "true",
+            "source.split.open-file-cost" -> "0 B",
+            "source.split.target-size" -> "50 B").asJava))
+      val reshuffled = binPacking.pack(Array[Split](split))
+
+      Assertions.assertEquals(1, reshuffled.length)
+      Assertions.assertEquals(1, reshuffled.head.splits.length)
+      Assertions.assertSame(split, reshuffled.head.splits.head)
+      Assertions.assertEquals(
+        2,
+        reshuffled.head.splits.head.asInstanceOf[DataSplit].dataFiles().size())
+    }
+  }
+
   test("Paimon: set open-file-cost to 0") {
     withTable("t") {
       sql("CREATE TABLE t (a INT, b STRING)")
@@ -126,21 +130,21 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
       def paimonScan() = getPaimonScan("SELECT * FROM t")
 
       // default openCostInBytes is 4m, so we will get 400 / 128 = 4 partitions
-      withSparkSQLConf("spark.sql.leafNodeDefaultParallelism" -> "1") {
+      withSparkSQLConf("spark.sql.files.minPartitionNum" -> "1") {
         assert(paimonScan().inputPartitions.length == 4)
       }
 
       withSparkSQLConf(
-        "spark.sql.files.openCostInBytes" -> "0",
-        "spark.sql.leafNodeDefaultParallelism" -> "1") {
+        "spark.sql.files.minPartitionNum" -> "1",
+        "spark.sql.files.openCostInBytes" -> "0") {
         assert(paimonScan().inputPartitions.length == 1)
       }
 
       // Paimon's conf takes precedence over Spark's
       withSparkSQLConf(
+        "spark.sql.files.minPartitionNum" -> "1",
         "spark.sql.files.openCostInBytes" -> "4194304",
-        "spark.paimon.source.split.open-file-cost" -> "0",
-        "spark.sql.leafNodeDefaultParallelism" -> "1") {
+        "spark.paimon.source.split.open-file-cost" -> "0") {
         assert(paimonScan().inputPartitions.length == 1)
       }
     }
@@ -176,4 +180,60 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
       }
     }
   }
+
+  private def newDataSplit(
+      prefix: String,
+      fileSizes: Seq[Long],
+      rawConvertible: Boolean = false,
+      deletionFileLength: Option[Long] = None): DataSplit = {
+    val files = fileSizes.zipWithIndex.map {
+      case (fileSize, index) => newDataFile(s"$prefix-$index.parquet", 
fileSize)
+    }
+    newDataSplitFromFiles(files, rawConvertible, deletionFileLength, prefix)
+  }
+
+  private def newDataSplitFromFiles(
+      files: Seq[DataFileMeta],
+      rawConvertible: Boolean,
+      deletionFileLength: Option[Long] = None,
+      deletionFilePrefix: String = "delete"): DataSplit = {
+    val builder = DataSplit
+      .builder()
+      .withSnapshot(1)
+      .withBucket(0)
+      .withPartition(BinaryRow.EMPTY_ROW)
+      .withDataFiles(files.asJava)
+      .rawConvertible(rawConvertible)
+      .withBucketPath("no use")
+    deletionFileLength.foreach {
+      length =>
+        builder.withDataDeletionFiles(
+          files.indices
+            .map(index => new DeletionFile(s"$deletionFilePrefix-$index.dv", 
0, length, null))
+            .asJava)
+    }
+    builder.build()
+  }
+
+  private def newDataFile(
+      fileName: String,
+      fileSize: Long,
+      rowCount: Long = 1,
+      maxSequenceNumber: Long = 0): DataFileMeta = {
+    DataFileMeta.forAppend(
+      fileName,
+      fileSize,
+      rowCount,
+      null,
+      0,
+      maxSequenceNumber,
+      1,
+      new java.util.ArrayList[String](),
+      null,
+      FileSource.APPEND,
+      null,
+      null,
+      null,
+      null)
+  }
 }

Reply via email to