This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3acf89b2d9 [spark] Refactor metadata only delete (#6852)
3acf89b2d9 is described below

commit 3acf89b2d94f2719c7f7892dd8747d038c5c660e
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Dec 22 10:43:43 2025 +0800

    [spark] Refactor metadata only delete (#6852)
---
 .../paimon/spark/PaimonPartitionManagement.scala   | 15 ++++
 .../apache/paimon/spark/PaimonSparkTableBase.scala |  9 ++-
 .../spark/catalyst/analysis/PaimonAnalysis.scala   |  5 +-
 .../catalyst/analysis/PaimonDeleteTable.scala      |  7 +-
 .../analysis/PaimonIncompatiblePHRRules.scala      | 54 --------------
 .../analysis/expressions/ExpressionHelper.scala    | 11 ---
 ...ptimizeMetadataOnlyDeleteFromPaimonTable.scala} | 84 +++++++++++++++++-----
 .../plans/logical/PaimonTableValuedFunctions.scala |  2 +-
 .../logical/TruncatePaimonTableWithFilter.scala    | 39 ++++++++++
 .../commands/DeleteFromPaimonTableCommand.scala    | 63 ++--------------
 .../commands/PaimonTruncateTableCommand.scala      | 52 --------------
 .../paimon/spark/execution/PaimonStrategy.scala    | 12 +++-
 .../TruncatePaimonTableWithFilterExec.scala        | 75 +++++++++++++++++++
 .../extensions/PaimonSparkSessionExtensions.scala  |  7 +-
 .../spark/sql/PaimonOptimizationTestBase.scala     | 45 +++++++++---
 15 files changed, 261 insertions(+), 219 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 36ab850d29..27fa93458b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -89,6 +89,21 @@ trait PaimonPartitionManagement extends 
SupportsAtomicPartitionManagement {
     }
   }
 
+  override def truncatePartitions(idents: Array[InternalRow]): Boolean = {
+    val partitions = toPaimonPartitions(idents).toSeq.asJava
+    val commit = table.newBatchWriteBuilder().newCommit()
+    try {
+      commit.truncatePartitions(partitions)
+    } finally {
+      commit.close()
+    }
+    true
+  }
+
+  override def truncatePartition(ident: InternalRow): Boolean = {
+    truncatePartitions(Array(ident))
+  }
+
   override def replacePartitionMetadata(
       ident: InternalRow,
       properties: JMap[String, String]): Unit = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 867de1109c..c0d161b224 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -31,7 +31,7 @@ import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, 
HASH_FIXED, POSTPONE_
 
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
SupportsTruncate, WriteBuilder}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map 
=> JMap, Set => JSet}
@@ -42,6 +42,7 @@ abstract class PaimonSparkTableBase(val table: Table)
   extends BaseTable
   with SupportsRead
   with SupportsWrite
+  with TruncatableTable
   with SupportsMetadataColumns {
 
   lazy val coreOptions = new CoreOptions(table.options())
@@ -152,4 +153,10 @@ abstract class PaimonSparkTableBase(val table: Table)
         throw new RuntimeException("Only FileStoreTable can be written.")
     }
   }
+
+  def truncateTable: Boolean = {
+    val commit = table.newBatchWriteBuilder().newCommit()
+    commit.truncateTable()
+    true
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index c6e10fabf1..1c248302c3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.spark.{SparkConnectorOptions, 
SparkTable}
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
 import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
-import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, 
PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand, 
PaimonTruncateTableCommand}
+import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, 
PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand}
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.FileStoreTable
 
@@ -330,9 +330,6 @@ case class PaimonPostHocResolutionRules(session: 
SparkSession) extends Rule[Logi
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan match {
-      case t @ TruncateTable(PaimonRelation(table)) if t.resolved =>
-        PaimonTruncateTableCommand(table, Map.empty)
-
       case a @ AnalyzeTable(
             ResolvedTable(catalog, identifier, table: SparkTable, _),
             partitionSpec,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 46b4cc05b4..6808e64c45 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -19,11 +19,11 @@
 package org.apache.paimon.spark.catalyst.analysis
 
 import org.apache.paimon.spark.SparkTable
+import 
org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 
@@ -39,8 +39,9 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with 
RowLevelHelper {
     table.coreOptions.deletionVectorsEnabled() ||
     table.coreOptions.rowTrackingEnabled() ||
     table.coreOptions.dataEvolutionEnabled() ||
-    // todo: Optimize v2 delete when conditions are all partition filters
-    condition == null || condition == TrueLiteral
+    OptimizeMetadataOnlyDeleteFromPaimonTable.isMetadataOnlyDelete(
+      baseTable.asInstanceOf[FileStoreTable],
+      condition)
   }
 
   override val operation: RowLevelOp = Delete
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
deleted file mode 100644
index bf6eb35757..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.catalyst.analysis
-
-import org.apache.paimon.spark.commands.PaimonTruncateTableCommand
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
TruncatePartition}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-/** These post-hoc resolution rules are incompatible between different 
versions of spark. */
-case class PaimonIncompatiblePHRRules(session: SparkSession) extends 
Rule[LogicalPlan] {
-
-  override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan match {
-      case t @ TruncatePartition(PaimonRelation(table), 
ResolvedPartitionSpec(names, ident, _))
-          if t.resolved =>
-        assert(names.length == ident.numFields, "Names and values of partition 
don't match")
-        val resolver = session.sessionState.conf.resolver
-        val schema = table.schema
-        val partitionSpec = names.zipWithIndex.map {
-          case (name, index) =>
-            val field = schema.find(f => resolver(f.name, name)).getOrElse {
-              throw new RuntimeException(s"$name is not a valid partition 
column in $schema.")
-            }
-
-            val partVal: String =
-              if (ident.isNullAt(index)) null else ident.get(index, 
field.dataType).toString
-            (name -> partVal)
-        }.toMap
-        PaimonTruncateTableCommand(table, partitionSpec)
-
-      case _ => plan
-    }
-  }
-
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 2bed4af873..82dcb594a2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -209,17 +209,6 @@ trait ExpressionHelperBase extends PredicateHelper {
     }
   }
 
-  def splitPruePartitionAndOtherPredicates(
-      condition: Expression,
-      partitionColumns: Seq[String],
-      resolver: Resolver): (Seq[Expression], Seq[Expression]) = {
-    splitConjunctivePredicates(condition)
-      .partition {
-        isPredicatePartitionColumnsOnly(_, partitionColumns, resolver) && 
!SubqueryExpression
-          .hasSubquery(condition)
-      }
-  }
-
   def isPredicatePartitionColumnsOnly(
       condition: Expression,
       partitionColumns: Seq[String],
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/OptimizeMetadataOnlyDeleteFromPaimonTable.scala
similarity index 58%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/OptimizeMetadataOnlyDeleteFromPaimonTable.scala
index 66f8a10f37..7f28182599 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/OptimizeMetadataOnlyDeleteFromPaimonTable.scala
@@ -18,52 +18,98 @@
 
 package org.apache.paimon.spark.catalyst.optimizer
 
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import 
org.apache.paimon.spark.catalyst.plans.logical.TruncatePaimonTableWithFilter
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{execution, PaimonSparkSession, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery, 
Literal, ScalarSubquery, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution}
+import org.apache.spark.sql.execution.ExecSubqueryExpression
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.apache.spark.sql.types.BooleanType
 
 import scala.collection.JavaConverters._
 
 /**
- * For those delete conditions with subqueries that only contain partition 
columns, we can eval them
- * in advance. So that when running [[DeleteFromPaimonTableCommand]], we can 
directly call
- * dropPartitions to achieve fast deletion.
+ * Similar to spark `OptimizeMetadataOnlyDeleteFromTable`. The reasons why 
Paimon Table does not
+ * inherit `SupportsDeleteV2` are as follows:
  *
- * Note: this rule must be placed before [[MergePaimonScalarSubqueries]], 
because
+ * <p>1. It needs to support both V1 delete and V2 delete simultaneously.
+ *
+ * <p>2. This rule can optimize partition filters that contain subqueries.
+ *
+ * <p>Note: this rule must be placed before [[MergePaimonScalarSubqueries]], 
because
  * [[MergePaimonScalarSubqueries]] will merge subqueries.
  */
-object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with 
ExpressionHelper with Logging {
+object OptimizeMetadataOnlyDeleteFromPaimonTable
+  extends Rule[LogicalPlan]
+  with ExpressionHelper
+  with Logging {
 
   lazy val spark: SparkSession = PaimonSparkSession.active
   lazy val resolver: Resolver = spark.sessionState.conf.resolver
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    plan.transformDown {
-      case d @ DeleteFromPaimonTableCommand(_, table, condition)
-          if SubqueryExpression.hasSubquery(condition) &&
-            isPredicatePartitionColumnsOnly(
-              condition,
-              table.partitionKeys().asScala.toSeq,
-              resolver) =>
-        try {
-          d.copy(condition = evalSubquery(condition))
-        } catch {
-          case e: Throwable =>
-            logInfo(s"Applying EvalSubqueriesForDeleteTable rule failed for: 
${e.getMessage}")
-            d
+    plan.transform {
+      case d @ DeleteFromPaimonTableCommand(r: DataSourceV2Relation, table, 
condition) =>
+        if (isTruncateTable(condition)) {
+          TruncatePaimonTableWithFilter(table, None)
+        } else if (isTruncatePartition(table, condition)) {
+          tryConvertToPartitionPredicate(r, table, condition) match {
+            case Some(p) => TruncatePaimonTableWithFilter(table, Some(p))
+            case _ => d
+          }
+        } else {
+          d
         }
     }
   }
 
+  def isMetadataOnlyDelete(table: FileStoreTable, condition: Expression): 
Boolean = {
+    isTruncateTable(condition) || isTruncatePartition(table, condition)
+  }
+
+  private def isTruncateTable(condition: Expression): Boolean = {
+    condition == null || condition == TrueLiteral
+  }
+
+  private def isTruncatePartition(table: FileStoreTable, condition: 
Expression): Boolean = {
+    val partitionKeys = table.partitionKeys().asScala.toSeq
+
+    partitionKeys.nonEmpty &&
+    !table.coreOptions().deleteForceProduceChangelog() &&
+    isPredicatePartitionColumnsOnly(condition, partitionKeys, resolver)
+  }
+
+  private def tryConvertToPartitionPredicate(
+      relation: DataSourceV2Relation,
+      table: FileStoreTable,
+      condition: Expression): Option[PartitionPredicate] = {
+    try {
+      val partitionRowType = table.schema().logicalPartitionType()
+      // For those delete conditions with subqueries that only contain 
partition columns, we can eval them in advance.
+      val finalCondiction = if (SubqueryExpression.hasSubquery(condition)) {
+        evalSubquery(condition)
+      } else {
+        condition
+      }
+      convertConditionToPaimonPredicate(finalCondiction, relation.output, 
partitionRowType) match {
+        case Some(p) => 
Some(PartitionPredicate.fromPredicate(partitionRowType, p))
+        case None => None
+      }
+    } catch {
+      case _: Throwable => None
+    }
+  }
+
   private def evalSubquery(condition: Expression): Expression = {
     condition.transformDown {
       case InSubquery(values, listQuery) =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
index 7e72abc4c9..e4f5e7856c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.plans.logical
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.SparkTable
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._
-import org.apache.paimon.table.{DataTable, FileStoreTable}
+import org.apache.paimon.table.DataTable
 import 
org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException
 
 import org.apache.spark.sql.PaimonUtils.createDataset
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TruncatePaimonTableWithFilter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TruncatePaimonTableWithFilter.scala
new file mode 100644
index 0000000000..323802750e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TruncatePaimonTableWithFilter.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.leafnode.PaimonLeafCommand
+import org.apache.paimon.table.Table
+
+/**
+ * Truncate paimon table with partition predicate.
+ *
+ * @param partitionPredicate
+ *   when it is none means truncate table
+ */
+case class TruncatePaimonTableWithFilter(
+    table: Table,
+    partitionPredicate: Option[PartitionPredicate])
+  extends PaimonLeafCommand {
+
+  override def simpleString(maxFields: Int): String = {
+    s"Truncate table $table with filter"
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 461aca90ef..ed40b5ff21 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -24,18 +24,14 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable
 import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.types.RowKind
-import org.apache.paimon.utils.InternalRowPartitionComputer
 
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
-import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.expressions.{Expression, Not}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.lit
 
-import scala.collection.JavaConverters._
-
 case class DeleteFromPaimonTableCommand(
     relation: DataSourceV2Relation,
     override val table: FileStoreTable,
@@ -45,61 +41,12 @@ case class DeleteFromPaimonTableCommand(
   with SupportsSubquery {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-
-    val commit = table.newBatchWriteBuilder().newCommit()
-    if (condition == null || condition == TrueLiteral) {
-      commit.truncateTable()
+    val commitMessages = if (usePKUpsertDelete()) {
+      performPrimaryKeyDelete(sparkSession)
     } else {
-      val (partitionCondition, otherCondition) = 
splitPruePartitionAndOtherPredicates(
-        condition,
-        table.partitionKeys().asScala.toSeq,
-        sparkSession.sessionState.conf.resolver)
-
-      val partitionPredicate = if (partitionCondition.isEmpty) {
-        None
-      } else {
-        try {
-          convertConditionToPaimonPredicate(
-            partitionCondition.reduce(And),
-            relation.output,
-            table.schema.logicalPartitionType())
-        } catch {
-          case _: Throwable =>
-            None
-        }
-      }
-
-      if (
-        otherCondition.isEmpty && partitionPredicate.nonEmpty && !table
-          .coreOptions()
-          .deleteForceProduceChangelog()
-      ) {
-        val matchedPartitions =
-          
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
-        val rowDataPartitionComputer = new InternalRowPartitionComputer(
-          table.coreOptions().partitionDefaultName(),
-          table.schema().logicalPartitionType(),
-          table.partitionKeys.asScala.toArray,
-          table.coreOptions().legacyPartitionName()
-        )
-        val dropPartitions = matchedPartitions.map {
-          partition => 
rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
-        }
-        if (dropPartitions.nonEmpty) {
-          commit.truncatePartitions(dropPartitions.asJava)
-        } else {
-          writer.commit(Seq.empty)
-        }
-      } else {
-        val commitMessages = if (usePKUpsertDelete()) {
-          performPrimaryKeyDelete(sparkSession)
-        } else {
-          performNonPrimaryKeyDelete(sparkSession)
-        }
-        writer.commit(commitMessages)
-      }
+      performNonPrimaryKeyDelete(sparkSession)
     }
-
+    writer.commit(commitMessages)
     Seq.empty[Row]
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
deleted file mode 100644
index f55c5011e2..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.commands
-
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.BatchWriteBuilder
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-
-import java.util.{Collections, UUID}
-
-import scala.collection.JavaConverters._
-
-case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec: 
TablePartitionSpec)
-  extends PaimonLeafRunnableCommand
-  with WithFileStoreTable {
-
-  override def table: FileStoreTable = 
v2Table.getTable.asInstanceOf[FileStoreTable]
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val commit = table.newBatchWriteBuilder().newCommit()
-
-    if (partitionSpec.isEmpty) {
-      commit.truncateTable()
-    } else {
-      commit.truncatePartitions(
-        Collections.singletonList(partitionSpec.asJava)
-      )
-    }
-
-    Seq.empty[Row]
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index bc0627d89f..3be8b5a74e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -18,11 +18,12 @@
 
 package org.apache.paimon.spark.execution
 
+import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable, 
SparkUtils}
 import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
 import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
-import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, 
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
-import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
+import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, 
PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, 
ShowTagsCommand, TruncatePaimonTableWithFilter}
+import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -125,7 +126,7 @@ case class PaimonStrategy(spark: SparkSession)
         case _ => Nil
       }
 
-    case d @ PaimonDropPartitions(
+    case PaimonDropPartitions(
           r @ ResolvedTable(_, _, table: SparkTable, _),
           parts,
           ifExists,
@@ -137,6 +138,11 @@ case class PaimonStrategy(spark: SparkSession)
         purge,
         recacheTable(r)) :: Nil
 
+    case TruncatePaimonTableWithFilter(
+          table: Table,
+          partitionPredicate: Option[PartitionPredicate]) =>
+      TruncatePaimonTableWithFilterExec(table, partitionPredicate) :: Nil
+
     case _ => Nil
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/TruncatePaimonTableWithFilterExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/TruncatePaimonTableWithFilterExec.scala
new file mode 100644
index 0000000000..41c2187ce3
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/TruncatePaimonTableWithFilterExec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+import org.apache.paimon.table.{FileStoreTable, Table}
+import org.apache.paimon.utils.InternalRowPartitionComputer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+import java.util.{Collections => JCollections}
+
+import scala.collection.JavaConverters._
+
+case class TruncatePaimonTableWithFilterExec(
+    table: Table,
+    partitionPredicate: Option[PartitionPredicate])
+  extends PaimonLeafV2CommandExec {
+
+  override def run(): Seq[InternalRow] = {
+    val commit = table.newBatchWriteBuilder().newCommit()
+
+    partitionPredicate match {
+      case Some(p) =>
+        table match {
+          case fileStoreTable: FileStoreTable =>
+            val matchedPartitions =
+              
fileStoreTable.newSnapshotReader().withPartitionFilter(p).partitions().asScala
+            if (matchedPartitions.nonEmpty) {
+              val partitionComputer = new InternalRowPartitionComputer(
+                fileStoreTable.coreOptions().partitionDefaultName(),
+                fileStoreTable.schema().logicalPartitionType(),
+                fileStoreTable.partitionKeys.asScala.toArray,
+                fileStoreTable.coreOptions().legacyPartitionName()
+              )
+              val dropPartitions =
+                
matchedPartitions.map(partitionComputer.generatePartValues(_).asScala.asJava)
+              commit.truncatePartitions(dropPartitions.asJava)
+            } else {
+              commit.commit(JCollections.emptyList())
+            }
+          case _ =>
+            throw new UnsupportedOperationException("Unsupported truncate 
table")
+        }
+      case _ =>
+        commit.truncateTable()
+    }
+    Nil
+  }
+
+  override def output: Seq[Attribute] = Nil
+
+  override def simpleString(maxFields: Int): String = {
+    s"TruncatePaimonTableWithFilterExec: ${table.fullName()}" +
+      partitionPredicate.map(p => s", PartitionPredicate: [$p]").getOrElse("")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index b6e29b8a77..950b5797c7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.spark.extensions
 
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatiblePHRRules, 
PaimonIncompatibleResolutionRules, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable, 
PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable}
-import 
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable, 
MergePaimonScalarSubqueries}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis, 
PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatibleResolutionRules, 
PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver, 
PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions, 
RewriteUpsertTable}
+import 
org.apache.paimon.spark.catalyst.optimizer.{MergePaimonScalarSubqueries, 
OptimizeMetadataOnlyDeleteFromPaimonTable}
 import 
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
 import org.apache.paimon.spark.commands.BucketExpression
 import org.apache.paimon.spark.execution.{OldCompatibleStrategy, 
PaimonStrategy}
@@ -46,7 +46,6 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
 
     extensions.injectPostHocResolutionRule(spark => 
ReplacePaimonFunctions(spark))
     extensions.injectPostHocResolutionRule(spark => 
PaimonPostHocResolutionRules(spark))
-    extensions.injectPostHocResolutionRule(spark => 
PaimonIncompatiblePHRRules(spark))
 
     extensions.injectPostHocResolutionRule(_ => PaimonUpdateTable)
     extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable)
@@ -65,7 +64,7 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     }
 
     // optimization rules
-    extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
+    extensions.injectOptimizerRule(_ => 
OptimizeMetadataOnlyDeleteFromPaimonTable)
     extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries)
 
     // planner extensions
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index d417b5f405..3eafcc1700 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -22,11 +22,13 @@ import org.apache.paimon.Snapshot.CommitKind
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries
+import org.apache.paimon.spark.execution.TruncatePaimonTableWithFilterExec
 
-import org.apache.spark.sql.{PaimonUtils, Row}
+import org.apache.spark.sql.{DataFrame, PaimonUtils, Row}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
LogicalPlan, OneRowRelation, WithCTE}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution.CommandResultExec
 import org.apache.spark.sql.functions._
 import org.junit.jupiter.api.Assertions
 
@@ -112,6 +114,23 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
     }
   }
 
+  test(s"Paimon Optimization: optimize metadata only delete") {
+    for (useV2Write <- Seq("true", "false")) {
+      withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+        withTable("t") {
+          sql(s"""
+                 |CREATE TABLE t (id INT, name STRING, pt INT)
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+          sql("INSERT INTO t VALUES (1, 'a', 1), (2, 'b', 2)")
+          val df = sql("DELETE FROM t WHERE pt = 1")
+          checkTruncatePaimonTable(df)
+          checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(2, "b", 2)))
+        }
+      }
+    }
+  }
+
   test(s"Paimon Optimization: eval subqueries for delete table with 
ScalarSubquery") {
     withPk.foreach(
       hasPk => {
@@ -132,14 +151,16 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
           spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
           spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
 
-          spark.sql(s"""DELETE FROM t1 WHERE
-                       |pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 3)
-                       |AND
-                       |pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 
3)""".stripMargin)
+          val df =
+            spark.sql(s"""DELETE FROM t1 WHERE
+                         |pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 
3)
+                         |AND
+                         |pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 
3)""".stripMargin)
           // For partition-only predicates, drop partition is called 
internally.
           Assertions.assertEquals(
             CommitKind.OVERWRITE,
             
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+          checkTruncatePaimonTable(df)
 
           checkAnswer(
             spark.sql("SELECT * FROM t1 ORDER BY id"),
@@ -176,14 +197,16 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
           spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
           spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
 
-          spark.sql(s"""DELETE FROM t1 WHERE
-                       |pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
-                       |OR
-                       |pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 
3)""".stripMargin)
+          val df =
+            spark.sql(s"""DELETE FROM t1 WHERE
+                         |pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
+                         |OR
+                         |pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND 
3)""".stripMargin)
           // For partition-only predicates, drop partition is called 
internally.
           Assertions.assertEquals(
             CommitKind.OVERWRITE,
             
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+          checkTruncatePaimonTable(df)
 
           checkAnswer(
             spark.sql("SELECT * FROM t1 ORDER BY id"),
@@ -206,4 +229,8 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
 
   def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex: 
Int): NamedExpression
 
+  def checkTruncatePaimonTable(df: DataFrame): Unit = {
+    val plan = 
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan
+    assert(plan.isInstanceOf[TruncatePaimonTableWithFilterExec])
+  }
 }


Reply via email to