This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 13e5564f66 [spark] Optimize drop multi partitions in batch (#7380)
13e5564f66 is described below
commit 13e5564f66cc1c0719fb8ff3eb31d29e02d7c0f9
Author: WenjunMin <[email protected]>
AuthorDate: Tue Mar 10 08:18:53 2026 +0800
[spark] Optimize drop multi partitions in batch (#7380)
Support to drop multi partial & full partitions. Removing the partition
expansion and avoid droping the partition one by one. This could be time
cost when the matched partition count is large.
---
.../paimon/spark/PaimonPartitionManagement.scala | 47 +++++++++----
.../spark/execution/PaimonDropPartitionsExec.scala | 31 ++++-----
.../spark/sql/PaimonPartitionManagementTest.scala | 76 ++++++++++++++++++++++
3 files changed, 126 insertions(+), 28 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 312a340397..511e728dbc 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -23,6 +23,7 @@ import org.apache.paimon.table.{FileStoreTable, Table}
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
@@ -33,7 +34,7 @@ import java.util.{Map => JMap, Objects}
import scala.collection.JavaConverters._
-trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
+trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement with
Logging {
val table: Table
@@ -44,19 +45,42 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
private def toPaimonPartitions(rows: Array[InternalRow]):
Array[java.util.Map[String, String]] = {
table match {
case fileStoreTable: FileStoreTable =>
- val rowConverter = CatalystTypeConverters
-
.createToScalaConverter(CharVarcharUtils.replaceCharVarcharWithString(partitionSchema))
- val rowDataPartitionComputer = new InternalRowPartitionComputer(
- fileStoreTable.coreOptions().partitionDefaultName(),
- partitionRowType,
- table.partitionKeys().asScala.toArray,
- CoreOptions.fromMap(table.options()).legacyPartitionName
- )
+ val partitionKeys = table.partitionKeys().asScala.toSeq
+ val partitionDefaultName =
fileStoreTable.coreOptions().partitionDefaultName()
+ val legacyPartitionName =
CoreOptions.fromMap(table.options()).legacyPartitionName
rows.map {
r =>
- rowDataPartitionComputer
- .generatePartValues(new SparkRow(partitionRowType,
rowConverter(r).asInstanceOf[Row]))
+ val partitionFieldCount = r.numFields
+ require(
+ partitionFieldCount <= partitionKeys.length,
+ s"Partition values length $partitionFieldCount exceeds partition
keys " +
+ s"${partitionKeys.mkString("[", ", ", "]")}."
+ )
+ val partitionNames = partitionKeys.take(partitionFieldCount)
+ val currentPartitionRowType =
+ if (partitionFieldCount == partitionRowType.getFieldCount) {
+ partitionRowType
+ } else {
+ TypeUtils.project(table.rowType, partitionNames.asJava)
+ }
+ val currentPartitionSchema =
+ if (partitionFieldCount == partitionSchema.length) {
+ partitionSchema
+ } else {
+ SparkTypeUtils.fromPaimonRowType(currentPartitionRowType)
+ }
+ val rowConverter = CatalystTypeConverters.createToScalaConverter(
+
CharVarcharUtils.replaceCharVarcharWithString(currentPartitionSchema))
+ val rowDataPartitionComputer = new InternalRowPartitionComputer(
+ partitionDefaultName,
+ currentPartitionRowType,
+ partitionNames.toArray,
+ legacyPartitionName
+ )
+
+ rowDataPartitionComputer.generatePartValues(
+ new SparkRow(currentPartitionRowType,
rowConverter(r).asInstanceOf[Row]))
}
case _ =>
throw new UnsupportedOperationException("Only FileStoreTable supports
partitions.")
@@ -67,6 +91,7 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
table match {
case fileStoreTable: FileStoreTable =>
val partitions = toPaimonPartitions(rows).toSeq.asJava
+ logInfo("Try to drop partitions: " + partitions.asScala.mkString(","))
val partitionModification =
fileStoreTable.catalogEnvironment().partitionModification()
if (partitionModification != null) {
try {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
index 7d06bc49a4..f257b68e33 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
@@ -35,37 +35,34 @@ case class PaimonDropPartitionsExec(
refreshCache: () => Unit)
extends LeafV2CommandExec
with Logging {
+
override protected def run(): Seq[InternalRow] = {
val partitionSchema = table.asPartitionable.partitionSchema()
+
val (partialPartSpecs, fullPartSpecs) =
partSpecs.partition(_.ident.numFields != partitionSchema.length)
- val (existsPartIdents, notExistsPartIdents) =
- fullPartSpecs.map(_.ident).partition(table.partitionExists)
- if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
+ val (existsFullPartSpecs, notExistsPartSpecs) =
+ fullPartSpecs.partition(spec => table.partitionExists(spec.ident))
+ if (notExistsPartSpecs.nonEmpty && !ignoreIfNotExists) {
throw new NoSuchPartitionsException(
table.name(),
- notExistsPartIdents,
+ notExistsPartSpecs.map(_.ident),
table.asPartitionable.partitionSchema())
}
- val allExistsPartIdents = existsPartIdents ++
partialPartSpecs.flatMap(expendPartialSpec)
- logDebug("Try to drop partitions: " + allExistsPartIdents.mkString(","))
- val isTableAltered = if (allExistsPartIdents.nonEmpty) {
- allExistsPartIdents
- .map(
- partIdent => {
- if (purge) table.purgePartition(partIdent) else
table.dropPartition(partIdent)
- })
- .reduce(_ || _)
+ val partSpecsToDrop = existsFullPartSpecs ++ partialPartSpecs
+ val isTableAltered = if (partSpecsToDrop.nonEmpty) {
+ val partIdentsToDrop = partSpecsToDrop.map(_.ident).toArray
+ if (purge) {
+ table.purgePartitions(partIdentsToDrop)
+ } else {
+ table.dropPartitions(partIdentsToDrop)
+ }
} else false
if (isTableAltered) refreshCache()
Seq.empty
}
- private def expendPartialSpec(partialSpec: ResolvedPartitionSpec):
Seq[InternalRow] = {
- table.listPartitionIdentifiers(partialSpec.names.toArray,
partialSpec.ident).toSeq
- }
-
override def output: Seq[Attribute] = Seq.empty
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
index 3acb318084..eebb3a8013 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -234,4 +234,80 @@ class PaimonPartitionManagementTest extends
PaimonSparkTestBase {
"SELECT 1 AS a, CAST('2024-06-01 10:30:01' AS TIMESTAMP) AS dt UNION
ALL SELECT 2, CAST('2024-06-02 15:45:30' AS TIMESTAMP) ORDER BY dt")
)
}
+
+ test("Paimon Partition Management: batch drop partitions") {
+ withTable("T_Batch") {
+ spark.sql(s"""
+ |CREATE TABLE T_Batch (a INT, dt INT, hh STRING, mm STRING)
+ |using paimon
+ |TBLPROPERTIES ('file.format' = 'avro')
+ |PARTITIONED BY (dt, hh, mm)
+ |""".stripMargin)
+
+ spark.sql(
+ "INSERT INTO T_Batch VALUES " +
+ "(1, 20240101, '00', '00'), " +
+ "(2, 20240101, '00', '01'), " +
+ "(3, 20240101, '01', '00'), " +
+ "(4, 20240102, '00', '00'), " +
+ "(5, 20240102, '00', '01')")
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS T_Batch"),
+ Row("dt=20240101/hh=00/mm=00") ::
+ Row("dt=20240101/hh=00/mm=01") ::
+ Row("dt=20240101/hh=01/mm=00") ::
+ Row("dt=20240102/hh=00/mm=00") ::
+ Row("dt=20240102/hh=00/mm=01") :: Nil
+ )
+
+ // First, drop all sub-partitions for dt=20240101 by specifying only the
first-level partition column dt
+ spark.sql("ALTER TABLE T_Batch DROP PARTITION (dt=20240101)")
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS T_Batch"),
+ Row("dt=20240102/hh=00/mm=00") ::
+ Row("dt=20240102/hh=00/mm=01") :: Nil
+ )
+
+ // Then, drop all sub-partitions under dt=20240102 and hh='00' by
specifying the first two partition columns dt and hh
+ spark.sql("ALTER TABLE T_Batch DROP PARTITION (dt=20240102, hh='00')")
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS T_Batch"),
+ Nil
+ )
+ }
+ }
+
+ test("Paimon Partition Management: batch drop partitions with mixed full and
partial specs") {
+ withTable("T_Mixed_Batch") {
+ spark.sql(s"""
+ |CREATE TABLE T_Mixed_Batch (a INT, dt INT, hh STRING, mm
STRING)
+ |using paimon
+ |TBLPROPERTIES ('file.format' = 'avro')
+ |PARTITIONED BY (dt, hh, mm)
+ |""".stripMargin)
+
+ spark.sql(
+ "INSERT INTO T_Mixed_Batch VALUES " +
+ "(1, 20240101, '00', '00'), " +
+ "(2, 20240101, '00', '01'), " +
+ "(3, 20240101, '01', '00'), " +
+ "(4, 20240102, '00', '00'), " +
+ "(5, 20240102, '00', '01')")
+
+ spark.sql(
+ "ALTER TABLE T_Mixed_Batch DROP PARTITION (dt=20240101, hh='00',
mm='00'), " +
+ "PARTITION (dt=20240102)")
+
+ checkAnswer(
+ spark.sql("SELECT COUNT(*) FROM `T_Mixed_Batch$snapshots` WHERE
commit_kind = 'OVERWRITE'"),
+ Row(1L) :: Nil
+ )
+
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS T_Mixed_Batch"),
+ Row("dt=20240101/hh=00/mm=01") ::
+ Row("dt=20240101/hh=01/mm=00") :: Nil
+ )
+ }
+ }
}