This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 13e5564f66 [spark] Optimize drop multi partitions in batch (#7380)
13e5564f66 is described below

commit 13e5564f66cc1c0719fb8ff3eb31d29e02d7c0f9
Author: WenjunMin <[email protected]>
AuthorDate: Tue Mar 10 08:18:53 2026 +0800

    [spark] Optimize drop multi partitions in batch (#7380)
    
    Support to drop multi partial & full partitions. Removing the partition
    expansion and avoid droping the partition one by one. This could be time
    cost when the matched partition count is large.
---
 .../paimon/spark/PaimonPartitionManagement.scala   | 47 +++++++++----
 .../spark/execution/PaimonDropPartitionsExec.scala | 31 ++++-----
 .../spark/sql/PaimonPartitionManagementTest.scala  | 76 ++++++++++++++++++++++
 3 files changed, 126 insertions(+), 28 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 312a340397..511e728dbc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.table.{FileStoreTable, Table}
 import org.apache.paimon.types.RowType
 import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -33,7 +34,7 @@ import java.util.{Map => JMap, Objects}
 
 import scala.collection.JavaConverters._
 
-trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
+trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement with 
Logging {
 
   val table: Table
 
@@ -44,19 +45,42 @@ trait PaimonPartitionManagement extends 
SupportsAtomicPartitionManagement {
   private def toPaimonPartitions(rows: Array[InternalRow]): 
Array[java.util.Map[String, String]] = {
     table match {
       case fileStoreTable: FileStoreTable =>
-        val rowConverter = CatalystTypeConverters
-          
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
-        val rowDataPartitionComputer = new InternalRowPartitionComputer(
-          fileStoreTable.coreOptions().partitionDefaultName(),
-          partitionRowType,
-          table.partitionKeys().asScala.toArray,
-          CoreOptions.fromMap(table.options()).legacyPartitionName
-        )
+        val partitionKeys = table.partitionKeys().asScala.toSeq
+        val partitionDefaultName = 
fileStoreTable.coreOptions().partitionDefaultName()
+        val legacyPartitionName = 
CoreOptions.fromMap(table.options()).legacyPartitionName
 
         rows.map {
           r =>
-            rowDataPartitionComputer
-              .generatePartValues(new SparkRow(partitionRowType, 
rowConverter(r).asInstanceOf[Row]))
+            val partitionFieldCount = r.numFields
+            require(
+              partitionFieldCount <= partitionKeys.length,
+              s"Partition values length $partitionFieldCount exceeds partition 
keys " +
+                s"${partitionKeys.mkString("[", ", ", "]")}."
+            )
+            val partitionNames = partitionKeys.take(partitionFieldCount)
+            val currentPartitionRowType =
+              if (partitionFieldCount == partitionRowType.getFieldCount) {
+                partitionRowType
+              } else {
+                TypeUtils.project(table.rowType, partitionNames.asJava)
+              }
+            val currentPartitionSchema =
+              if (partitionFieldCount == partitionSchema.length) {
+                partitionSchema
+              } else {
+                SparkTypeUtils.fromPaimonRowType(currentPartitionRowType)
+              }
+            val rowConverter = CatalystTypeConverters.createToScalaConverter(
+              
CharVarcharUtils.replaceCharVarcharWithString(currentPartitionSchema))
+            val rowDataPartitionComputer = new InternalRowPartitionComputer(
+              partitionDefaultName,
+              currentPartitionRowType,
+              partitionNames.toArray,
+              legacyPartitionName
+            )
+
+            rowDataPartitionComputer.generatePartValues(
+              new SparkRow(currentPartitionRowType, 
rowConverter(r).asInstanceOf[Row]))
         }
       case _ =>
         throw new UnsupportedOperationException("Only FileStoreTable supports 
partitions.")
@@ -67,6 +91,7 @@ trait PaimonPartitionManagement extends 
SupportsAtomicPartitionManagement {
     table match {
       case fileStoreTable: FileStoreTable =>
         val partitions = toPaimonPartitions(rows).toSeq.asJava
+        logInfo("Try to drop partitions: " + partitions.asScala.mkString(","))
         val partitionModification = 
fileStoreTable.catalogEnvironment().partitionModification()
         if (partitionModification != null) {
           try {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
index 7d06bc49a4..f257b68e33 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
@@ -35,37 +35,34 @@ case class PaimonDropPartitionsExec(
     refreshCache: () => Unit)
   extends LeafV2CommandExec
   with Logging {
+
   override protected def run(): Seq[InternalRow] = {
     val partitionSchema = table.asPartitionable.partitionSchema()
+
     val (partialPartSpecs, fullPartSpecs) =
       partSpecs.partition(_.ident.numFields != partitionSchema.length)
 
-    val (existsPartIdents, notExistsPartIdents) =
-      fullPartSpecs.map(_.ident).partition(table.partitionExists)
-    if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
+    val (existsFullPartSpecs, notExistsPartSpecs) =
+      fullPartSpecs.partition(spec => table.partitionExists(spec.ident))
+    if (notExistsPartSpecs.nonEmpty && !ignoreIfNotExists) {
       throw new NoSuchPartitionsException(
         table.name(),
-        notExistsPartIdents,
+        notExistsPartSpecs.map(_.ident),
         table.asPartitionable.partitionSchema())
     }
-    val allExistsPartIdents = existsPartIdents ++ 
partialPartSpecs.flatMap(expendPartialSpec)
-    logDebug("Try to drop partitions: " + allExistsPartIdents.mkString(","))
-    val isTableAltered = if (allExistsPartIdents.nonEmpty) {
-      allExistsPartIdents
-        .map(
-          partIdent => {
-            if (purge) table.purgePartition(partIdent) else 
table.dropPartition(partIdent)
-          })
-        .reduce(_ || _)
+    val partSpecsToDrop = existsFullPartSpecs ++ partialPartSpecs
+    val isTableAltered = if (partSpecsToDrop.nonEmpty) {
+      val partIdentsToDrop = partSpecsToDrop.map(_.ident).toArray
+      if (purge) {
+        table.purgePartitions(partIdentsToDrop)
+      } else {
+        table.dropPartitions(partIdentsToDrop)
+      }
     } else false
 
     if (isTableAltered) refreshCache()
     Seq.empty
   }
 
-  private def expendPartialSpec(partialSpec: ResolvedPartitionSpec): 
Seq[InternalRow] = {
-    table.listPartitionIdentifiers(partialSpec.names.toArray, 
partialSpec.ident).toSeq
-  }
-
   override def output: Seq[Attribute] = Seq.empty
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
index 3acb318084..eebb3a8013 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -234,4 +234,80 @@ class PaimonPartitionManagementTest extends 
PaimonSparkTestBase {
         "SELECT 1 AS a, CAST('2024-06-01 10:30:01' AS TIMESTAMP) AS dt UNION 
ALL SELECT 2, CAST('2024-06-02 15:45:30' AS TIMESTAMP) ORDER BY dt")
     )
   }
+
+  test("Paimon Partition Management: batch drop partitions") {
+    withTable("T_Batch") {
+      spark.sql(s"""
+                   |CREATE TABLE T_Batch (a INT, dt INT, hh STRING, mm STRING)
+                   |using paimon
+                   |TBLPROPERTIES ('file.format' = 'avro')
+                   |PARTITIONED BY (dt, hh, mm)
+                   |""".stripMargin)
+
+      spark.sql(
+        "INSERT INTO T_Batch VALUES " +
+          "(1, 20240101, '00', '00'), " +
+          "(2, 20240101, '00', '01'), " +
+          "(3, 20240101, '01', '00'), " +
+          "(4, 20240102, '00', '00'), " +
+          "(5, 20240102, '00', '01')")
+      checkAnswer(
+        spark.sql("SHOW PARTITIONS T_Batch"),
+        Row("dt=20240101/hh=00/mm=00") ::
+          Row("dt=20240101/hh=00/mm=01") ::
+          Row("dt=20240101/hh=01/mm=00") ::
+          Row("dt=20240102/hh=00/mm=00") ::
+          Row("dt=20240102/hh=00/mm=01") :: Nil
+      )
+
+      // First, drop all sub-partitions for dt=20240101 by specifying only the 
first-level partition column dt
+      spark.sql("ALTER TABLE T_Batch DROP PARTITION (dt=20240101)")
+      checkAnswer(
+        spark.sql("SHOW PARTITIONS T_Batch"),
+        Row("dt=20240102/hh=00/mm=00") ::
+          Row("dt=20240102/hh=00/mm=01") :: Nil
+      )
+
+      // Then, drop all sub-partitions under dt=20240102 and hh='00' by 
specifying the first two partition columns dt and hh
+      spark.sql("ALTER TABLE T_Batch DROP PARTITION (dt=20240102, hh='00')")
+      checkAnswer(
+        spark.sql("SHOW PARTITIONS T_Batch"),
+        Nil
+      )
+    }
+  }
+
+  test("Paimon Partition Management: batch drop partitions with mixed full and 
partial specs") {
+    withTable("T_Mixed_Batch") {
+      spark.sql(s"""
+                   |CREATE TABLE T_Mixed_Batch (a INT, dt INT, hh STRING, mm 
STRING)
+                   |using paimon
+                   |TBLPROPERTIES ('file.format' = 'avro')
+                   |PARTITIONED BY (dt, hh, mm)
+                   |""".stripMargin)
+
+      spark.sql(
+        "INSERT INTO T_Mixed_Batch VALUES " +
+          "(1, 20240101, '00', '00'), " +
+          "(2, 20240101, '00', '01'), " +
+          "(3, 20240101, '01', '00'), " +
+          "(4, 20240102, '00', '00'), " +
+          "(5, 20240102, '00', '01')")
+
+      spark.sql(
+        "ALTER TABLE T_Mixed_Batch DROP PARTITION (dt=20240101, hh='00', 
mm='00'), " +
+          "PARTITION (dt=20240102)")
+
+      checkAnswer(
+        spark.sql("SELECT COUNT(*) FROM `T_Mixed_Batch$snapshots` WHERE 
commit_kind = 'OVERWRITE'"),
+        Row(1L) :: Nil
+      )
+
+      checkAnswer(
+        spark.sql("SHOW PARTITIONS T_Mixed_Batch"),
+        Row("dt=20240101/hh=00/mm=01") ::
+          Row("dt=20240101/hh=01/mm=00") :: Nil
+      )
+    }
+  }
 }

Reply via email to