This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 68355b564 [spark] drop partition when all the conditions is 
partition-related (#3263)
68355b564 is described below

commit 68355b56451cdcba2c5da41d15a568d1248513af
Author: Yann Byron <[email protected]>
AuthorDate: Fri Apr 26 14:11:04 2024 +0800

    [spark] drop partition when all the conditions is partition-related (#3263)
---
 .../commands/DeleteFromPaimonTableCommand.scala    | 36 ++++++++++------------
 1 file changed, 17 insertions(+), 19 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 0ac0b14bb..c955acbdf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.PaimonSplitScan
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
@@ -27,6 +27,7 @@ import 
org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
 import org.apache.paimon.types.RowKind
+import org.apache.paimon.utils.RowDataPartitionComputer
 
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
@@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, 
SupportsSubquery}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.lit
 
-import java.util.{Collections, UUID}
+import java.util.UUID
 
 import scala.collection.JavaConverters._
 
@@ -62,38 +63,35 @@ case class DeleteFromPaimonTableCommand(
         table.partitionKeys().asScala,
         sparkSession.sessionState.conf.resolver)
 
-      // TODO: provide another partition visitor to support more partition 
predicate.
-      val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
       val partitionPredicate = if (partitionCondition.isEmpty) {
         None
       } else {
         convertConditionToPaimonPredicate(
           partitionCondition.reduce(And),
           relation.output,
-          rowType,
+          table.schema.logicalPartitionType(),
           ignoreFailure = true)
       }
 
-      // We do not have to scan table if the following three requirements are 
met:
-      // 1) no other predicate;
-      // 2) partition condition can convert to paimon predicate;
-      // 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor.
-      val forceDeleteByRows =
-        otherCondition.nonEmpty || partitionPredicate.isEmpty || 
!partitionPredicate.get.visit(
-          visitor)
-
-      if (forceDeleteByRows) {
+      if (otherCondition.isEmpty && partitionPredicate.nonEmpty) {
+        val allPartitions = table.newReadBuilder.newScan.listPartitions.asScala
+        val matchedPartitions = 
allPartitions.filter(partitionPredicate.get.test)
+        val rowDataPartitionComputer = new RowDataPartitionComputer(
+          CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
+          table.schema().logicalPartitionType(),
+          table.partitionKeys.asScala.toArray
+        )
+        val dropPartitions = matchedPartitions.map {
+          partition => 
rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
+        }
+        commit.dropPartitions(dropPartitions.asJava, 
BatchWriteBuilder.COMMIT_IDENTIFIER)
+      } else {
         val commitMessages = if (withPrimaryKeys) {
           performDeleteForPkTable(sparkSession)
         } else {
           performDeleteForNonPkTable(sparkSession)
         }
         writer.commit(commitMessages)
-      } else {
-        val dropPartitions = visitor.partitions()
-        commit.dropPartitions(
-          Collections.singletonList(dropPartitions),
-          BatchWriteBuilder.COMMIT_IDENTIFIER)
       }
     }
 

Reply via email to