This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 70959c989 [spark] Enhance spark DELETE (#2183)
70959c989 is described below

commit 70959c989963eecba50669f6a2fac64ab1033631
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Nov 2 13:59:06 2023 +0800

    [spark] Enhance spark DELETE (#2183)
---
 docs/content/how-to/writing-tables.md              |  12 +-
 .../java/org/apache/paimon/spark/SparkRow.java     |  15 +-
 .../java/org/apache/paimon/spark/SparkTable.java   |  49 +++++-
 .../paimon/spark/commands/PaimonCommand.scala      |   2 -
 .../spark/commands/WriteIntoPaimonTable.scala      |  20 ++-
 .../extensions/PaimonSparkSessionExtensions.scala  |   6 +-
 .../paimon/spark/schema/SparkSystemColumns.scala   |  36 ++++
 .../apache/paimon/spark/util/SparkRowUtils.scala   |  44 +++++
 .../optimizer/RewriteRowLeverCommands.scala        |  82 +++++++++
 .../logical/DeleteFromPaimonTableCommand.scala     |  54 ++++++
 .../org/apache/paimon/spark/SparkWriteITCase.java  |   9 +-
 .../paimon/spark/SparkWriteWithKyroITCase.java     |   4 +
 .../paimon/spark/sql/DeleteFromTableTest.scala     | 183 +++++++++++++++++++++
 13 files changed, 493 insertions(+), 23 deletions(-)

diff --git a/docs/content/how-to/writing-tables.md 
b/docs/content/how-to/writing-tables.md
index 78bb35f9d..14379eb96 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -430,8 +430,18 @@ DELETE FROM MyTable WHERE currency = 'UNKNOWN';
 {{< /tab >}}
 
 {{< tab "Spark" >}}
+{{< hint info >}}
+Important table properties setting:
+1. Only primary key tables support this feature.
+2. If the table has primary keys, [MergeEngine]({{< ref 
"concepts/primary-key-table#merge-engines" >}}) needs to be [deduplicate]({{< 
ref "concepts/primary-key-table#deduplicate" >}}) to support this feature.
+   {{< /hint >}}
 
-Spark `DELETE` currently supports only a single point execution, for deleting 
small amounts of data.
+To enable delete needs these configs below:
+
+```text
+--conf 
spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog
+--conf 
spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
+```
 
 ```sql
 DELETE FROM MyTable WHERE currency = 'UNKNOWN';
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index e4928974b..b46a10fb0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -54,10 +54,16 @@ public class SparkRow implements InternalRow, Serializable {
 
     private final RowType type;
     private final Row row;
+    private final RowKind rowKind;
 
     public SparkRow(RowType type, Row row) {
+        this(type, row, RowKind.INSERT);
+    }
+
+    public SparkRow(RowType type, Row row, RowKind rowkind) {
         this.type = type;
         this.row = row;
+        this.rowKind = rowkind;
     }
 
     @Override
@@ -67,16 +73,13 @@ public class SparkRow implements InternalRow, Serializable {
 
     @Override
     public RowKind getRowKind() {
-        return RowKind.INSERT;
+        return rowKind;
     }
 
     @Override
     public void setRowKind(RowKind rowKind) {
-        if (rowKind == RowKind.INSERT) {
-            return;
-        }
-
-        throw new UnsupportedOperationException("Can not set row kind for this 
row except INSERT.");
+        throw new UnsupportedOperationException(
+                "Spark row does not support modifying rowkind field");
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 674b4fbc2..77090f6ab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -19,12 +19,16 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.TableUtils;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 
 import org.apache.spark.sql.connector.catalog.SupportsDelete;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
@@ -36,10 +40,13 @@ import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.write.LogicalWriteInfo;
 import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.AlwaysTrue;
 import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -47,6 +54,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 /** A spark {@link org.apache.spark.sql.connector.catalog.Table} for paimon. */
 public class SparkTable
@@ -57,6 +65,7 @@ public class SparkTable
                 PaimonPartitionManagement {
 
     private final Table table;
+    @Nullable protected Predicate deletePredicate;
 
     public SparkTable(Table table) {
         this.table = table;
@@ -110,19 +119,47 @@ public class SparkTable
         }
     }
 
-    @Override
-    public void deleteWhere(Filter[] filters) {
+    public boolean canDeleteWhere(Filter[] filters) {
         SparkFilterConverter converter = new 
SparkFilterConverter(table.rowType());
         List<Predicate> predicates = new ArrayList<>();
         for (Filter filter : filters) {
-            if ("AlwaysTrue()".equals(filter.toString())) {
+            if (filter.equals(new AlwaysTrue())) {
                 continue;
             }
-
             predicates.add(converter.convert(filter));
         }
+        deletePredicate = predicates.isEmpty() ? null : 
PredicateBuilder.and(predicates);
+        return deletePredicate == null || deleteIsDropPartition();
+    }
+
+    @Override
+    public void deleteWhere(Filter[] filters) {
+        FileStoreCommit commit =
+                ((AbstractFileStoreTable) 
table).store().newCommit(UUID.randomUUID().toString());
+        long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
+        if (deletePredicate == null) {
+            commit.purgeTable(identifier);
+        } else if (deleteIsDropPartition()) {
+            
commit.dropPartitions(Collections.singletonList(deletePartitions()), 
identifier);
+        } else {
+            // can't reach here
+            throw new UnsupportedOperationException();
+        }
+    }
 
-        TableUtils.deleteWhere(table, predicates);
+    private boolean deleteIsDropPartition() {
+        return deletePredicate != null
+                && deletePredicate.visit(new 
OnlyPartitionKeyEqualVisitor(table.partitionKeys()));
+    }
+
+    private Map<String, String> deletePartitions() {
+        if (deletePredicate == null) {
+            return null;
+        }
+        OnlyPartitionKeyEqualVisitor visitor =
+                new OnlyPartitionKeyEqualVisitor(table.partitionKeys());
+        deletePredicate.visit(visitor);
+        return visitor.partitions();
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 6e88ec6c8..786afbba9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -29,8 +29,6 @@ import java.io.IOException
 /** Helper trait for all paimon commands. */
 trait PaimonCommand extends WithFileStoreTable {
 
-  val BUCKET_COL = "_bucket_"
-
   lazy val bucketMode: BucketMode = table match {
     case fileStoreTable: FileStoreTable =>
       fileStoreTable.bucketMode
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 35167c6af..24af34062 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -23,7 +23,9 @@ import org.apache.paimon.index.PartitionIndex
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, 
SaveMode, SparkConnectorOptions, SparkRow}
 import org.apache.paimon.spark.SparkUtils.createIOManager
-import org.apache.paimon.spark.util.EncoderUtils
+import org.apache.paimon.spark.schema.SparkSystemColumns
+import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
+import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, 
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
 import org.apache.paimon.types.RowType
@@ -32,7 +34,6 @@ import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.encoders.RowEncoder.encoderFor
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.functions._
@@ -59,13 +60,14 @@ case class WriteIntoPaimonTable(
 
   private lazy val mergeSchema = 
options.get(SparkConnectorOptions.MERGE_SCHEMA)
 
-  /** \1. 2. */
   override def run(sparkSession: SparkSession): Seq[Row] = {
     import sparkSession.implicits._
 
+    val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)
+
     if (mergeSchema) {
       val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
-      mergeAndCommitSchema(data.schema, allowExplicitCast)
+      mergeAndCommitSchema(dataSchema, allowExplicitCast)
     }
 
     val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
@@ -76,9 +78,11 @@ case class WriteIntoPaimonTable(
     val primaryKeyCols = tableSchema.trimmedPrimaryKeys().asScala.map(col)
     val partitionCols = tableSchema.partitionKeys().asScala.map(col)
 
-    val dataEncoder = EncoderUtils.encode(data.schema).resolveAndBind()
+    val dataEncoder = EncoderUtils.encode(dataSchema).resolveAndBind()
     val originFromRow = dataEncoder.createDeserializer()
 
+    val rowkindColIdx = SparkRowUtils.getFieldIndex(data.schema, ROW_KIND_COL)
+
     // append _bucket_ column as placeholder
     val withBucketCol = data.withColumn(BUCKET_COL, lit(-1))
     val bucketColIdx = withBucketCol.schema.size - 1
@@ -132,7 +136,11 @@ case class WriteIntoPaimonTable(
               row =>
                 val bucket = row.getInt(bucketColIdx)
                 val bucketColDropped = originFromRow(toRow(row))
-                write.write(new DynamicBucketRow(new SparkRow(rowType, 
bucketColDropped), bucket))
+                val sparkRow = new SparkRow(
+                  rowType,
+                  bucketColDropped,
+                  SparkRowUtils.getRowKind(row, rowkindColIdx))
+                write.write(new DynamicBucketRow(sparkRow, bucket))
             }
             val serializer = new CommitMessageSerializer
             write.prepareCommit().asScala.map(serializer.serialize).toIterator
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index dae604073..3c9f7a76b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -19,6 +19,7 @@ package org.apache.paimon.spark.extensions
 
 import org.apache.spark.sql.SparkSessionExtensions
 import org.apache.spark.sql.catalyst.analysis.{CoerceArguments, 
PaimonAnalysis, ResolveProcedures}
+import org.apache.spark.sql.catalyst.optimizer.RewriteRowLeverCommands
 import 
org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser
 import org.apache.spark.sql.catalyst.plans.logical.PaimonTableValuedFunctions
 import 
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
@@ -31,10 +32,13 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectParser { case (_, parser) => new 
PaimonSparkSqlExtensionsParser(parser) }
 
     // analyzer extensions
-    // resolution rule extensions
     extensions.injectResolutionRule(sparkSession => new 
PaimonAnalysis(sparkSession))
     extensions.injectResolutionRule(spark => ResolveProcedures(spark))
     extensions.injectResolutionRule(_ => CoerceArguments)
+
+    // optimizer extensions
+    extensions.injectOptimizerRule(_ => RewriteRowLeverCommands)
+
     // table function extensions
     PaimonTableValuedFunctions.supportedFnNames.foreach {
       fnName =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/SparkSystemColumns.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/SparkSystemColumns.scala
new file mode 100644
index 000000000..bae79127c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/SparkSystemColumns.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.schema
+
+import org.apache.spark.sql.types.StructType
+
+/** System columns for paimon spark. */
+object SparkSystemColumns {
+
+  // for assigning bucket when writing
+  val BUCKET_COL = "_bucket_"
+
+  // for row lever operation
+  val ROW_KIND_COL = "_row_kind_"
+
+  val SPARK_SYSTEM_COLUMNS_NAME: Seq[String] = Seq(BUCKET_COL, ROW_KIND_COL)
+
+  def filterSparkSystemColumns(schema: StructType): StructType = {
+    StructType(schema.fields.filterNot(field => 
SPARK_SYSTEM_COLUMNS_NAME.contains(field.name)))
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
new file mode 100644
index 000000000..f9ee33c43
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.types.RowKind
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+
+object SparkRowUtils {
+
+  def getRowKind(row: Row, rowkindColIdx: Int): RowKind = {
+    if (rowkindColIdx != -1) {
+      RowKind.fromByteValue(row.getByte(rowkindColIdx))
+    } else {
+      RowKind.INSERT
+    }
+  }
+
+  def getFieldIndex(schema: StructType, colName: String): Int = {
+    try {
+      schema.fieldIndex(colName)
+    } catch {
+      case _: IllegalArgumentException =>
+        -1
+    }
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteRowLeverCommands.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteRowLeverCommands.scala
new file mode 100644
index 000000000..807b851e8
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteRowLeverCommands.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.paimon.CoreOptions.{MERGE_ENGINE, MergeEngine}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.Table
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper, 
SubqueryExpression}
+import 
org.apache.spark.sql.catalyst.plans.logical.{DeleteFromPaimonTableCommand, 
DeleteFromTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.SupportsDelete
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+object RewriteRowLeverCommands extends Rule[LogicalPlan] with PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+    case d @ DeleteFromTable(r: DataSourceV2Relation, condition) =>
+      validateDeletable(r.table.asInstanceOf[SparkTable].getTable)
+      if (canDeleteWhere(r, condition)) {
+        d
+      } else {
+        DeleteFromPaimonTableCommand(r, condition)
+      }
+  }
+
+  private def validateDeletable(table: Table): Boolean = {
+    val options = Options.fromMap(table.options)
+    if (table.primaryKeys().isEmpty) {
+      throw new UnsupportedOperationException(
+        String.format(
+          "table '%s' can not support delete, because there is no primary 
key.",
+          table.getClass.getName))
+    }
+    if (!options.get(MERGE_ENGINE).equals(MergeEngine.DEDUPLICATE)) {
+      throw new UnsupportedOperationException(
+        String.format(
+          "merge engine '%s' can not support delete, currently only %s can 
support delete.",
+          options.get(MERGE_ENGINE),
+          MergeEngine.DEDUPLICATE))
+    }
+    true
+  }
+
+  private def canDeleteWhere(relation: DataSourceV2Relation, condition: 
Expression): Boolean = {
+    relation.table match {
+      case t: SupportsDelete if !SubqueryExpression.hasSubquery(condition) =>
+        // fail if any filter cannot be converted.
+        // correctness depends on removing all matching data.
+        val filters = DataSourceStrategy
+          .normalizeExprs(Seq(condition), relation.output)
+          .flatMap(splitConjunctivePredicates(_).map {
+            f =>
+              DataSourceStrategy
+                .translateFilter(f, supportNestedPredicatePushdown = true)
+                .getOrElse(throw new AnalysisException(s"Exec update failed:" +
+                  s" cannot translate expression to source filter: $f"))
+          })
+          .toArray
+        t.canDeleteWhere(filters)
+      case _ => false
+    }
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
new file mode 100644
index 000000000..f380533a8
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{InsertInto, SparkTable}
+import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.RowKind
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.lit
+
+case class DeleteFromPaimonTableCommand(relation: DataSourceV2Relation, 
condition: Expression)
+  extends LeafRunnableCommand {
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val filteredPlan = if (condition != null) {
+      Filter(condition, relation)
+    } else {
+      relation
+    }
+    val df = Dataset
+      .ofRows(sparkSession, filteredPlan)
+      .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
+
+    WriteIntoPaimonTable(
+      
relation.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable],
+      InsertInto,
+      df,
+      Options.fromMap(relation.options)).run(sparkSession)
+
+    Seq.empty[Row]
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 5de73ba66..0e930c82f 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
 
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -43,7 +44,13 @@ public class SparkWriteITCase {
     @BeforeAll
     public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
         Path warehousePath = new Path("file:///" + tempDir.toString());
-        spark = SparkSession.builder().master("local[2]").getOrCreate();
+        spark =
+                SparkSession.builder()
+                        .master("local[2]")
+                        .config(
+                                "spark.sql.extensions",
+                                PaimonSparkSessionExtensions.class.getName())
+                        .getOrCreate();
         spark.conf().set("spark.sql.catalog.paimon", 
SparkCatalog.class.getName());
         spark.conf().set("spark.sql.catalog.paimon.warehouse", 
warehousePath.toString());
         spark.sql("CREATE DATABASE paimon.db");
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
index c008657e5..ae3f92841 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
 
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.BeforeAll;
@@ -34,6 +35,9 @@ public class SparkWriteWithKyroITCase extends 
SparkWriteITCase {
         spark =
                 SparkSession.builder()
                         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+                        .config(
+                                "spark.sql.extensions",
+                                PaimonSparkSessionExtensions.class.getName())
                         .master("local[2]")
                         .getOrCreate();
         spark.conf().set("spark.sql.catalog.paimon", 
SparkCatalog.class.getName());
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
new file mode 100644
index 000000000..2008d7775
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+
+class DeleteFromTableTest extends PaimonSparkTestBase {
+
+  test(s"test delete from append only table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING)
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+
+    assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name = 'a'"))
+      .isInstanceOf(classOf[UnsupportedOperationException]);
+  }
+
+  CoreOptions.MergeEngine.values().foreach {
+    mergeEngine =>
+      {
+        test(s"test delete with merge engine $mergeEngine") {
+          val supportUpdateEngines = Seq("deduplicate")
+          val options = if ("first-row".equals(mergeEngine.toString)) {
+            s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine', 
'changelog-producer' = 'lookup'"
+          } else {
+            s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine'"
+          }
+          spark.sql(s"""
+                       |CREATE TABLE T (id INT, name STRING, dt STRING)
+                       |TBLPROPERTIES ($options)
+                       |""".stripMargin)
+
+          spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+
+          if (supportUpdateEngines.contains(mergeEngine.toString)) {
+            spark.sql("DELETE FROM T WHERE name = 'a'")
+          } else
+            assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name = 
'a'"))
+              .isInstanceOf(classOf[UnsupportedOperationException])
+        }
+      }
+  }
+
+  test(s"test delete with primary key") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING)
+                 |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' = 
'deduplicate')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c', 
'33')")
+
+    spark.sql("DELETE FROM T WHERE id = 1")
+
+    val rows1 = spark.sql("SELECT * FROM T").collectAsList()
+    assertThat(rows1.toString).isEqualTo("[[2,b,22], [3,c,33]]")
+
+    spark.sql("DELETE FROM T WHERE id < 3")
+
+    val rows2 = spark.sql("SELECT * FROM T").collectAsList()
+    assertThat(rows2.toString).isEqualTo("[[3,c,33]]")
+  }
+
+  test(s"test delete with non-primary key") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING)
+                 |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' = 
'deduplicate')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c', 
'33'), (4, 'a', '44')")
+
+    spark.sql("DELETE FROM T WHERE name = 'a'")
+
+    val rows1 = spark.sql("SELECT * FROM T").collectAsList()
+    assertThat(rows1.toString).isEqualTo("[[2,b,22], [3,c,33]]")
+
+    spark.sql("DELETE FROM T WHERE name < 'c'")
+
+    val rows2 = spark.sql("SELECT * FROM T").collectAsList()
+    assertThat(rows2.toString).isEqualTo("[[3,c,33]]")
+  }
+
+  test(s"test delete with no where") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING)
+                 |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' = 
'deduplicate')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c', 
'33')")
+
+    spark.sql("DELETE FROM T")
+
+    val rows = spark.sql("SELECT * FROM T").collectAsList()
+    assertThat(rows.toString).isEqualTo("[]")
+  }
+
+  test(s"test delete with in condition") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING)
+                 |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' = 
'deduplicate')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c', 
'33')")
+
+    spark.sql("DELETE FROM T WHERE id IN (1, 2)")
+
+    val rows = spark.sql("SELECT * FROM T").collectAsList()
+    assertThat(rows.toString).isEqualTo("[[3,c,33]]")
+  }
+
+  test(s"test delete with in subquery") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING)
+                 |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' = 
'deduplicate')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c', 
'33')")
+
+    import testImplicits._
+    val df = Seq(1, 2).toDF("id")
+    df.createOrReplaceTempView("deleted_ids")
+    spark.sql("DELETE FROM T WHERE id IN (SELECT * FROM deleted_ids)")
+
+    val rows = spark.sql("SELECT * FROM T").collectAsList()
+    assertThat(rows.toString).isEqualTo("[[3,c,33]]")
+  }
+
+  test(s"test delete is drop partition") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING, hh STRING)
+                 |TBLPROPERTIES ('primary-key' = 'id, dt, hh', 'merge-engine' 
= 'deduplicate')
+                 |PARTITIONED BY (dt, hh)
+                 |""".stripMargin)
+
+    spark.sql(
+      "INSERT INTO T VALUES " +
+        "(1, 'a', '2023-10-01', '12')," +
+        "(2, 'b', '2023-10-01', '12')," +
+        "(3, 'c', '2023-10-02', '12')," +
+        "(4, 'd', '2023-10-02', '13')," +
+        "(5, 'e', '2023-10-02', '14')," +
+        "(6, 'f', '2023-10-02', '15')")
+
+    // delete isn't drop partition
+    spark.sql("DELETE FROM T WHERE name = 'a' and hh = '12'")
+    val rows1 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+    assertThat(rows1.toString).isEqualTo(
+      "[[2,b,2023-10-01,12], [3,c,2023-10-02,12], [4,d,2023-10-02,13], 
[5,e,2023-10-02,14], [6,f,2023-10-02,15]]")
+
+    // delete is drop partition
+    spark.sql("DELETE FROM T WHERE hh = '12'")
+    val rows2 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+    assertThat(rows2.toString).isEqualTo(
+      "[[4,d,2023-10-02,13], [5,e,2023-10-02,14], [6,f,2023-10-02,15]]")
+
+    spark.sql("DELETE FROM T WHERE dt = '2023-10-02' and hh = '13'")
+    val rows3 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+    assertThat(rows3.toString).isEqualTo("[[5,e,2023-10-02,14], 
[6,f,2023-10-02,15]]")
+
+    spark.sql("DELETE FROM T WHERE dt = '2023-10-02'")
+    val rows4 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+    assertThat(rows4.toString).isEqualTo("[]")
+  }
+}


Reply via email to