This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new db3a6939f [spark] Delete supports all merge engines (#3294)
db3a6939f is described below

commit db3a6939fe155beb7d575f1d84d414dafd2c6f15
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 13:43:42 2024 +0800

    [spark] Delete supports all merge engines (#3294)
---
 .../paimon/spark/procedure/CompactProcedure.java   |  8 ++---
 .../spark/catalyst/analysis/RowLevelOp.scala       |  6 +++-
 .../commands/DeleteFromPaimonTableCommand.scala    | 22 +++++++------
 .../paimon/spark/commands/PaimonCommand.scala      | 10 ++++++
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  8 +++++
 .../paimon/spark/sql/DeleteFromTableTest.scala     | 36 +++++++++++++---------
 6 files changed, 62 insertions(+), 28 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 51fa6bd6e..b4cbcca06 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -41,7 +41,7 @@ import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.table.sink.CompactionTaskSerializer;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParameterUtils;
 import org.apache.paimon.utils.SerializationUtils;
@@ -221,13 +221,13 @@ public class CompactProcedure extends BaseProcedure {
 
     private void compactAwareBucketTable(
             FileStoreTable table, @Nullable Predicate filter, JavaSparkContext 
javaSparkContext) {
-        InnerTableScan scan = table.newScan();
+        SnapshotReader snapshotReader = table.newSnapshotReader();
         if (filter != null) {
-            scan.withFilter(filter);
+            snapshotReader.withFilter(filter);
         }
 
         List<Pair<byte[], Integer>> partitionBuckets =
-                scan.plan().splits().stream()
+                snapshotReader.read().splits().stream()
                         .map(split -> (DataSplit) split)
                         .map(dataSplit -> Pair.of(dataSplit.partition(), 
dataSplit.bucket()))
                         .distinct()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
index 2f5892a02..41881b7b7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
@@ -46,7 +46,11 @@ sealed trait RowLevelOp {
 
 case object Delete extends RowLevelOp {
 
-  override val supportedMergeEngine: Seq[MergeEngine] = 
Seq(MergeEngine.DEDUPLICATE)
+  override val supportedMergeEngine: Seq[MergeEngine] = Seq(
+    MergeEngine.DEDUPLICATE,
+    MergeEngine.PARTIAL_UPDATE,
+    MergeEngine.AGGREGATE,
+    MergeEngine.FIRST_ROW)
 
   override val supportAppendOnlyTable: Boolean = true
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index cbd365f88..7ce78e79c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -19,9 +19,7 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions
-import org.apache.paimon.options.Options
-import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
+import org.apache.paimon.CoreOptions.MergeEngine
 import org.apache.paimon.spark.{InsertInto, SparkTable}
 import org.apache.paimon.spark.PaimonSplitScan
 import org.apache.paimon.spark.catalyst.Compatibility
@@ -99,10 +97,10 @@ case class DeleteFromPaimonTableCommand(
           writer.commit(Seq.empty)
         }
       } else {
-        val commitMessages = if (withPrimaryKeys) {
-          performDeleteForPkTable(sparkSession)
+        val commitMessages = if (usePrimaryKeyDelete()) {
+          performPrimaryKeyDelete(sparkSession)
         } else {
-          performDeleteForNonPkTable(sparkSession)
+          performDeleteCopyOnWrite(sparkSession)
         }
         writer.commit(commitMessages)
       }
@@ -111,13 +109,17 @@ case class DeleteFromPaimonTableCommand(
     Seq.empty[Row]
   }
 
-  def performDeleteForPkTable(sparkSession: SparkSession): Seq[CommitMessage] 
= {
+  def usePrimaryKeyDelete(): Boolean = {
+    withPrimaryKeys && table.coreOptions().mergeEngine() == 
MergeEngine.DEDUPLICATE
+  }
+
+  def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage] 
= {
     val df = createDataset(sparkSession, Filter(condition, relation))
       .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
     writer.write(df)
   }
 
-  def performDeleteForNonPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
+  def performDeleteCopyOnWrite(sparkSession: SparkSession): Seq[CommitMessage] 
= {
     // Step1: the candidate data splits which are filtered by Paimon Predicate.
     val candidateDataSplits = findCandidateDataSplits(condition, 
relation.output)
     val fileNameToMeta = candidateFileMap(candidateDataSplits)
@@ -142,7 +144,9 @@ case class DeleteFromPaimonTableCommand(
         PaimonSplitScan(table, touchedDataSplits),
         relation.output))
     val data = createDataset(sparkSession, toRewriteScanRelation)
-    val addCommitMessage = writer.write(data)
+
+    // only write new files, should have no compaction
+    val addCommitMessage = writer.writeOnly().write(data)
 
     // Step5: convert the deleted files that need to be wrote to commit 
message.
     val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index da91ab1fb..863310e55 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -27,6 +27,7 @@ import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFile
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.Preconditions
 
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.SparkSession
@@ -95,6 +96,7 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
   protected def findCandidateDataSplits(
       condition: Expression,
       output: Seq[Attribute]): Seq[DataSplit] = {
+    // low level snapshot reader, it can not be affected by 'scan.mode'
     val snapshotReader = table.newSnapshotReader()
     if (condition == TrueLiteral) {
       val filter =
@@ -112,6 +114,14 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper {
       sparkSession: SparkSession): Array[String] = {
     import sparkSession.implicits._
 
+    // only raw convertible can generate input_file_name()
+    for (split <- candidateDataSplits) {
+      if (!split.rawConvertible()) {
+        throw new IllegalArgumentException(
+          "Only compacted table can generate touched files, please use 
'COMPACT' procedure first.");
+      }
+    }
+
     val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
     val filteredRelation =
       FilterLogicalNode(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ab42a4317..db7c4c0c0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.WRITE_ONLY
 import org.apache.paimon.index.BucketAssigner
 import org.apache.paimon.spark.SparkRow
 import org.apache.paimon.spark.SparkUtils.createIOManager
@@ -32,6 +34,8 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row, 
SparkSession}
 import org.apache.spark.sql.functions._
 
 import java.io.IOException
+import java.util.Collections
+import java.util.Collections.singletonMap
 
 import scala.collection.JavaConverters._
 
@@ -54,6 +58,10 @@ case class PaimonSparkWriter(table: FileStoreTable) {
 
   val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
 
+  def writeOnly(): PaimonSparkWriter = {
+    PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true")))
+  }
+
   def write(data: Dataset[_]): Seq[CommitMessage] = {
     val sparkSession = data.sparkSession
     import sparkSession.implicits._
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 0117c5f95..8c879e213 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -19,9 +19,11 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.MergeEngine
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.spark.catalyst.analysis.Delete
 
+import org.apache.spark.sql.Row
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 
 abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
@@ -176,23 +178,30 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
     mergeEngine =>
       {
         test(s"test delete with merge engine $mergeEngine") {
-          val options = if ("first-row".equals(mergeEngine.toString)) {
-            s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine', 
'changelog-producer' = 'lookup'"
-          } else {
-            s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine'"
-          }
+          val otherOptions =
+            if ("first-row".equals(mergeEngine.toString)) 
"'changelog-producer' = 'lookup'," else ""
           spark.sql(s"""
-                       |CREATE TABLE T (id INT, name STRING, dt STRING)
-                       |TBLPROPERTIES ($options)
+                       |CREATE TABLE T (id INT, name STRING, age INT)
+                       |TBLPROPERTIES (
+                       |  $otherOptions
+                       |  'primary-key' = 'id',
+                       |  'merge-engine' = '$mergeEngine',
+                       |  'write-only' = 'true')
                        |""".stripMargin)
 
-          spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+          spark.sql("INSERT INTO T VALUES (1, 'a', NULL)")
+          spark.sql("INSERT INTO T VALUES (2, 'b', NULL)")
+          spark.sql("INSERT INTO T VALUES (1, NULL, 16)")
+
+          if (mergeEngine != MergeEngine.DEDUPLICATE) {
+            assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id = 1"))
+              .hasMessageContaining("please use 'COMPACT' procedure first")
+            spark.sql("CALL sys.compact(table => 'T')")
+          }
 
-          if (Delete.supportedMergeEngine.contains(mergeEngine)) {
-            spark.sql("DELETE FROM T WHERE name = 'a'")
-          } else
-            assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name = 
'a'"))
-              .isInstanceOf(classOf[UnsupportedOperationException])
+          spark.sql("DELETE FROM T WHERE id = 1")
+          assertThat(spark.sql("SELECT * FROM T").collectAsList().toString)
+            .isEqualTo("[[2,b,null]]")
         }
       }
   }
@@ -345,7 +354,6 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
     spark.sql("DELETE FROM T WHERE hh = '12'")
     assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE 
rowkind='-D'").collectAsList().size())
       .isEqualTo(3)
-
   }
 }
 

Reply via email to