This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 68355b564 [spark] drop partition when all the conditions is
partition-related (#3263)
68355b564 is described below
commit 68355b56451cdcba2c5da41d15a568d1248513af
Author: Yann Byron <[email protected]>
AuthorDate: Fri Apr 26 14:11:04 2024 +0800
[spark] drop partition when all the conditions is partition-related (#3263)
---
.../commands/DeleteFromPaimonTableCommand.scala | 36 ++++++++++------------
1 file changed, 17 insertions(+), 19 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 0ac0b14bb..c955acbdf 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
+import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
@@ -27,6 +27,7 @@ import
org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
+import org.apache.paimon.utils.RowDataPartitionComputer
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
@@ -36,7 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter,
SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit
-import java.util.{Collections, UUID}
+import java.util.UUID
import scala.collection.JavaConverters._
@@ -62,38 +63,35 @@ case class DeleteFromPaimonTableCommand(
table.partitionKeys().asScala,
sparkSession.sessionState.conf.resolver)
- // TODO: provide another partition visitor to support more partition
predicate.
- val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
val partitionPredicate = if (partitionCondition.isEmpty) {
None
} else {
convertConditionToPaimonPredicate(
partitionCondition.reduce(And),
relation.output,
- rowType,
+ table.schema.logicalPartitionType(),
ignoreFailure = true)
}
- // We do not have to scan table if the following three requirements are
met:
- // 1) no other predicate;
- // 2) partition condition can convert to paimon predicate;
- // 3) partition predicate can be visit by OnlyPartitionKeyEqualVisitor.
- val forceDeleteByRows =
- otherCondition.nonEmpty || partitionPredicate.isEmpty ||
!partitionPredicate.get.visit(
- visitor)
-
- if (forceDeleteByRows) {
+ if (otherCondition.isEmpty && partitionPredicate.nonEmpty) {
+ val allPartitions = table.newReadBuilder.newScan.listPartitions.asScala
+ val matchedPartitions =
allPartitions.filter(partitionPredicate.get.test)
+ val rowDataPartitionComputer = new RowDataPartitionComputer(
+ CoreOptions.PARTITION_DEFAULT_NAME.defaultValue,
+ table.schema().logicalPartitionType(),
+ table.partitionKeys.asScala.toArray
+ )
+ val dropPartitions = matchedPartitions.map {
+ partition =>
rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
+ }
+ commit.dropPartitions(dropPartitions.asJava,
BatchWriteBuilder.COMMIT_IDENTIFIER)
+ } else {
val commitMessages = if (withPrimaryKeys) {
performDeleteForPkTable(sparkSession)
} else {
performDeleteForNonPkTable(sparkSession)
}
writer.commit(commitMessages)
- } else {
- val dropPartitions = visitor.partitions()
- commit.dropPartitions(
- Collections.singletonList(dropPartitions),
- BatchWriteBuilder.COMMIT_IDENTIFIER)
}
}