This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 222dc8f7c [Spark] Minor Refactor Spark Commands (#2352)
222dc8f7c is described below

commit 222dc8f7c938c7995cde77b62d3507d24a4e060b
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 22 09:51:23 2023 +0800

    [Spark] Minor Refactor Spark Commands (#2352)
---
 .../java/org/apache/paimon/spark/SparkTable.java   |  55 ----------
 .../commands/DeleteFromPaimonTableCommand.scala    |  88 ++++++++++++++++
 .../paimon/spark/commands/PaimonCommand.scala      |  23 +++-
 .../PaimonDynamicPartitionOverwriteCommand.scala   |   9 +-
 .../commands/PaimonTruncateTableCommand.scala      |  52 +++++++++
 .../spark/commands}/UpdatePaimonTableCommand.scala |  12 +--
 .../extensions/PaimonSparkSessionExtensions.scala  |   7 +-
 .../main/scala/org/apache/spark/sql/Utils.scala    |  13 +++
 .../sql/catalyst/analysis/PaimonAnalysis.scala     |  18 +++-
 .../analysis/PaimonDeleteTable.scala}              |  26 ++---
 .../sql/catalyst/analysis/PaimonMergeInto.scala    |  70 +++---------
 .../analysis/PaimonMergeIntoResolver.scala         |   2 +-
 .../{AnalysisHelper.scala => PaimonRelation.scala} |  16 +--
 .../sql/catalyst/analysis/PaimonUpdateTable.scala  |  43 ++++++++
 .../analysis/RewriteRowLevelCommands.scala         | 117 ---------------------
 .../sql/catalyst/analysis/RowLevelHelper.scala     |  95 +++++++++++++++++
 .../sql/{ => catalyst/analysis}/RowLevelOp.scala   |   9 +-
 .../logical/DeleteFromPaimonTableCommand.scala     |  57 ----------
 .../paimon/spark/sql/DeleteFromTableTest.scala     |   2 +-
 .../apache/paimon/spark/sql/UpdateTableTest.scala  |   6 +-
 20 files changed, 396 insertions(+), 324 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 77090f6ab..6b21fad0a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -19,18 +19,12 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
 
-import org.apache.spark.sql.connector.catalog.SupportsDelete;
 import org.apache.spark.sql.connector.catalog.SupportsRead;
 import org.apache.spark.sql.connector.catalog.SupportsWrite;
 import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -40,28 +34,22 @@ import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.write.LogicalWriteInfo;
 import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.sources.AlwaysTrue;
-import org.apache.spark.sql.sources.Filter;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 /** A spark {@link org.apache.spark.sql.connector.catalog.Table} for paimon. */
 public class SparkTable
         implements org.apache.spark.sql.connector.catalog.Table,
                 SupportsRead,
                 SupportsWrite,
-                SupportsDelete,
                 PaimonPartitionManagement {
 
     private final Table table;
@@ -119,49 +107,6 @@ public class SparkTable
         }
     }
 
-    public boolean canDeleteWhere(Filter[] filters) {
-        SparkFilterConverter converter = new 
SparkFilterConverter(table.rowType());
-        List<Predicate> predicates = new ArrayList<>();
-        for (Filter filter : filters) {
-            if (filter.equals(new AlwaysTrue())) {
-                continue;
-            }
-            predicates.add(converter.convert(filter));
-        }
-        deletePredicate = predicates.isEmpty() ? null : 
PredicateBuilder.and(predicates);
-        return deletePredicate == null || deleteIsDropPartition();
-    }
-
-    @Override
-    public void deleteWhere(Filter[] filters) {
-        FileStoreCommit commit =
-                ((AbstractFileStoreTable) 
table).store().newCommit(UUID.randomUUID().toString());
-        long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
-        if (deletePredicate == null) {
-            commit.purgeTable(identifier);
-        } else if (deleteIsDropPartition()) {
-            
commit.dropPartitions(Collections.singletonList(deletePartitions()), 
identifier);
-        } else {
-            // can't reach here
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private boolean deleteIsDropPartition() {
-        return deletePredicate != null
-                && deletePredicate.visit(new 
OnlyPartitionKeyEqualVisitor(table.partitionKeys()));
-    }
-
-    private Map<String, String> deletePartitions() {
-        if (deletePredicate == null) {
-            return null;
-        }
-        OnlyPartitionKeyEqualVisitor visitor =
-                new OnlyPartitionKeyEqualVisitor(table.partitionKeys());
-        deletePredicate.visit(visitor);
-        return visitor.partitions();
-    }
-
     @Override
     public Map<String, String> properties() {
         if (table instanceof DataTable) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
new file mode 100644
index 000000000..fa1a46333
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
+import org.apache.paimon.spark.{InsertInto, SparkTable}
+import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.types.RowKind
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Utils.createDataset
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter, 
LogicalPlan, SupportsSubquery}
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.functions.lit
+
+import java.util.{Collections, UUID}
+
+import scala.util.control.NonFatal
+
+case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete: 
DeleteFromTable)
+  extends LeafRunnableCommand
+  with PaimonCommand {
+
+  override def table: FileStoreTable = 
v2Table.getTable.asInstanceOf[FileStoreTable]
+
+  private val relation = delete.table
+  private val condition = delete.condition
+
+  private lazy val (deletePredicate, forceDeleteByRows) =
+    if (condition == null || condition == TrueLiteral) {
+      (None, false)
+    } else {
+      try {
+        (Some(convertConditionToPaimonPredicate(condition, relation.output)), 
false)
+      } catch {
+        case NonFatal(_) =>
+          (None, true)
+      }
+    }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val commit = table.store.newCommit(UUID.randomUUID.toString)
+
+    if (forceDeleteByRows) {
+      deleteRowsByCondition(sparkSession)
+    } else if (deletePredicate.isEmpty) {
+      commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+    } else {
+      val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
+      if (deletePredicate.get.visit(visitor)) {
+        val dropPartitions = visitor.partitions()
+        commit.dropPartitions(
+          Collections.singletonList(dropPartitions),
+          BatchWriteBuilder.COMMIT_IDENTIFIER)
+      } else {
+        deleteRowsByCondition(sparkSession)
+      }
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def deleteRowsByCondition(sparkSession: SparkSession): Unit = {
+    val df = createDataset(sparkSession, Filter(condition, relation))
+      .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
+
+    WriteIntoPaimonTable(table, InsertInto, df, new 
Options()).run(sparkSession)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 786afbba9..6b07856c0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -17,17 +17,22 @@
  */
 package org.apache.paimon.spark.commands
 
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
 import org.apache.paimon.spark.SparkFilterConverter
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageSerializer}
 import org.apache.paimon.types.RowType
 
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
PredicateHelper}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
 
 import java.io.IOException
 
 /** Helper trait for all paimon commands. */
-trait PaimonCommand extends WithFileStoreTable {
+trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
 
   lazy val bucketMode: BucketMode = table match {
     case fileStoreTable: FileStoreTable =>
@@ -47,6 +52,22 @@ trait PaimonCommand extends WithFileStoreTable {
     }
   }
 
+  protected def convertConditionToPaimonPredicate(
+      condition: Expression,
+      output: Seq[Attribute]): Predicate = {
+    val converter = new SparkFilterConverter(table.rowType)
+    val filters = normalizeExprs(Seq(condition), output)
+      .flatMap(splitConjunctivePredicates(_).map {
+        f =>
+          translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
+            throw new RuntimeException("Exec update failed:" +
+              s" cannot translate expression to source filter: $f"))
+      })
+      .toArray
+    val predicates = filters.map(converter.convert)
+    PredicateBuilder.and(predicates: _*)
+  }
+
   /**
    * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
    * methods where the `AlwaysTrue` Filter is used.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PaimonDynamicPartitionOverwriteCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
similarity index 90%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PaimonDynamicPartitionOverwriteCommand.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
index 44c322ac5..b69ef6e25 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PaimonDynamicPartitionOverwriteCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
@@ -15,15 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.paimon.spark.commands
 
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.DynamicOverWrite
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.table.FileStoreTable
 
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Utils.createDataset
 import org.apache.spark.sql.catalyst.analysis.NamedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.execution.command.RunnableCommand
 
 import scala.collection.convert.ImplicitConversions._
@@ -62,7 +63,7 @@ case class PaimonDynamicPartitionOverwriteCommand(
     WriteIntoPaimonTable(
       fileStoreTable,
       DynamicOverWrite,
-      Dataset.ofRows(sparkSession, query),
+      createDataset(sparkSession, query),
       Options.fromMap(fileStoreTable.options() ++ writeOptions)
     ).run(sparkSession)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
new file mode 100644
index 000000000..132d2da65
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.BatchWriteBuilder
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+
+case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec: 
TablePartitionSpec)
+  extends LeafRunnableCommand
+  with PaimonCommand {
+
+  override def table: FileStoreTable = 
v2Table.getTable.asInstanceOf[FileStoreTable]
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val commit = table.store.newCommit(UUID.randomUUID.toString)
+
+    if (partitionSpec.isEmpty) {
+      commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+    } else {
+      commit.dropPartitions(
+        Collections.singletonList(partitionSpec.asJava),
+        BatchWriteBuilder.COMMIT_IDENTIFIER
+      )
+    }
+
+    Seq.empty[Row]
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
similarity index 87%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdatePaimonTableCommand.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index c9ec48a16..a44dd1a36 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -15,19 +15,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.paimon.spark.commands
 
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.RowKind
 
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Utils.createDataset
 import org.apache.spark.sql.catalyst.analysis.{AssignmentAlignmentHelper, 
EliminateSubqueryAliases}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If}
+import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, 
UpdateTable}
 import org.apache.spark.sql.execution.command.LeafRunnableCommand
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.lit
@@ -47,8 +48,7 @@ case class UpdatePaimonTableCommand(u: UpdateTable)
 
     val updatedPlan = Project(updatedExprs, 
Filter(u.condition.getOrElse(TrueLiteral), relation))
 
-    val df = Dataset
-      .ofRows(sparkSession, updatedPlan)
+    val df = createDataset(sparkSession, updatedPlan)
       .withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue))
 
     WriteIntoPaimonTable(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 93278ddb2..bf84b8955 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
 package org.apache.paimon.spark.extensions
 
 import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.spark.sql.catalyst.analysis.{CoerceArguments, 
PaimonAnalysis, PaimonMergeInto, ResolveProcedures, RewriteRowLevelCommands}
+import org.apache.spark.sql.catalyst.analysis.{CoerceArguments, 
PaimonAnalysis, PaimonDeleteTable, PaimonMergeInto, 
PaimonPostHocResolutionRules, PaimonUpdateTable, ResolveProcedures}
 import 
org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser
 import org.apache.spark.sql.catalyst.plans.logical.PaimonTableValuedFunctions
 import 
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
@@ -35,7 +35,10 @@ class PaimonSparkSessionExtensions extends 
(SparkSessionExtensions => Unit) {
     extensions.injectResolutionRule(spark => ResolveProcedures(spark))
     extensions.injectResolutionRule(_ => CoerceArguments)
 
-    extensions.injectPostHocResolutionRule(_ => RewriteRowLevelCommands)
+    extensions.injectPostHocResolutionRule(spark => 
PaimonPostHocResolutionRules(spark))
+
+    extensions.injectPostHocResolutionRule(_ => PaimonUpdateTable)
+    extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable)
     extensions.injectPostHocResolutionRule(spark => PaimonMergeInto(spark))
 
     // table function extensions
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
index e371a18ed..feb8815f9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
@@ -17,7 +17,10 @@
  */
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
 
 /**
  * Some classes or methods defined in the spark project are marked as private 
under
@@ -45,4 +48,14 @@ object Utils {
   def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan): 
Dataset[Row] = {
     Dataset.ofRows(sparkSession, logicalPlan)
   }
+
+  def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]): 
Seq[Expression] = {
+    DataSourceStrategy.normalizeExprs(exprs, attributes)
+  }
+
+  def translateFilter(
+      predicate: Expression,
+      supportNestedPredicatePushdown: Boolean): Option[Filter] = {
+    DataSourceStrategy.translateFilter(predicate, 
supportNestedPredicatePushdown)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
index 6498fb5ae..caca14c73 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
@@ -18,14 +18,16 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.paimon.spark.SparkTable
+import 
org.apache.paimon.spark.commands.{PaimonDynamicPartitionOverwriteCommand, 
PaimonTruncateTableCommand}
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, OverwritePartitionsDynamic, 
PaimonDynamicPartitionOverwriteCommand, PaimonTableValuedFunctions, 
PaimonTableValueFunction}
+import org.apache.spark.sql.catalyst.analysis.PaimonRelation.isPaimonTable
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
MergeIntoTable, OverwritePartitionsDynamic, PaimonTableValuedFunctions, 
PaimonTableValueFunction, TruncateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 
-class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] with 
AnalysisHelper {
+class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
 
@@ -41,6 +43,18 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] with Analy
 
 }
 
+case class PaimonPostHocResolutionRules(session: SparkSession) extends 
Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case t @ TruncateTable(PaimonRelation(table)) if t.resolved =>
+        PaimonTruncateTableCommand(table, Map.empty)
+
+      case _ => plan
+    }
+  }
+}
+
 object PaimonDynamicPartitionOverwrite {
   def unapply(o: OverwritePartitionsDynamic): Option[(DataSourceV2Relation, 
FileStoreTable)] = {
     if (o.query.resolved) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonDeleteTable.scala
similarity index 56%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonDeleteTable.scala
index 620ab814a..7b2175ec7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonDeleteTable.scala
@@ -15,23 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql
+package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.paimon.CoreOptions.MergeEngine
+import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
 
-sealed trait RowLevelOp {
-  val supportedMergeEngine: Seq[MergeEngine]
-}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
 
-case object Delete extends RowLevelOp {
-  override def toString: String = "delete"
+object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
 
-  override val supportedMergeEngine: Seq[MergeEngine] = 
Seq(MergeEngine.DEDUPLICATE)
-}
+  override val operation: RowLevelOp = Delete
 
-case object Update extends RowLevelOp {
-  override def toString: String = "update"
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperators {
+      case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved 
=>
+        checkPaimonTable(table)
 
-  override val supportedMergeEngine: Seq[MergeEngine] =
-    Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+        DeleteFromPaimonTableCommand(table, d)
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
index 7fa050398..4908fa7ec 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.commands.MergeIntoPaimonTable
-import org.apache.paimon.table.{FileStoreTable, PrimaryKeyFileStoreTable}
 
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.PaimonRelation
 import org.apache.spark.sql.catalyst.analysis.expressions.ExpressionHelper
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, Expression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, InsertStarAction, LogicalPlan, MergeAction, MergeIntoTable, 
UpdateAction, UpdateStarAction}
 import org.apache.spark.sql.catalyst.rules.Rule
 
@@ -33,26 +33,22 @@ import scala.collection.mutable
 /** A post-hoc resolution rule for MergeInto. */
 case class PaimonMergeInto(spark: SparkSession)
   extends Rule[LogicalPlan]
-  with AnalysisHelper
+  with RowLevelHelper
   with ExpressionHelper {
 
+  override val operation: RowLevelOp = MergeInto
+
   private val resolver: Resolver = spark.sessionState.conf.resolver
 
   def apply(plan: LogicalPlan): LogicalPlan = {
-    plan match {
-      case merge: MergeIntoTable =>
-        val relation = getPaimonTableRelation(merge.targetTable)
+    plan.resolveOperators {
+      case merge: MergeIntoTable
+          if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable) 
=>
+        val relation = PaimonRelation.getPaimonRelation(merge.targetTable)
         val v2Table = relation.table.asInstanceOf[SparkTable]
         val targetOutput = relation.output
 
-        v2Table.getTable match {
-          case _: PrimaryKeyFileStoreTable =>
-          case _: FileStoreTable =>
-            throw new RuntimeException("Only support to merge into table with 
primary keys.")
-          case _ =>
-            throw new RuntimeException("Can't merge into a non-file store 
table.")
-        }
-
+        checkPaimonTable(v2Table)
         checkCondition(merge.mergeCondition)
         merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
         merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
@@ -60,8 +56,7 @@ case class PaimonMergeInto(spark: SparkSession)
         val updateActions = merge.matchedActions.collect { case a: 
UpdateAction => a }
         val primaryKeys = 
v2Table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
         checkUpdateActionValidity(
-          merge.targetTable,
-          merge.sourceTable,
+          AttributeSet(targetOutput),
           merge.mergeCondition,
           updateActions,
           primaryKeys)
@@ -78,8 +73,6 @@ case class PaimonMergeInto(spark: SparkSession)
           merge.mergeCondition,
           alignedMatchedActions,
           alignedNotMatchedActions)
-
-      case _ => plan
     }
   }
 
@@ -171,54 +164,21 @@ case class PaimonMergeInto(spark: SparkSession)
 
   /** This check will avoid to update the primary key columns */
   private def checkUpdateActionValidity(
-      target: LogicalPlan,
-      source: LogicalPlan,
+      targetOutput: AttributeSet,
       mergeCondition: Expression,
       actions: Seq[UpdateAction],
       primaryKeys: Seq[String]): Unit = {
-    val targetOutput = target.outputSet
-    val sourceOutput = source.outputSet
-
-    // Check whether this attribute is same to primary key and is from target 
table.
-    def isTargetPrimaryKey(attr: AttributeReference, primaryKey: String): 
Boolean = {
-      resolver(primaryKey, attr.name) && targetOutput.contains(attr)
-    }
-
-    // Check whether there is an `EqualTo` expression related to primary key 
between source and target.
-    def existsPrimaryKeyEqualToExpression(
-        expressions: Seq[Expression],
-        primaryKey: String): Boolean = {
-      expressions.exists {
-        case EqualTo(left: AttributeReference, right: AttributeReference) =>
-          if (isTargetPrimaryKey(left, primaryKey)) {
-            sourceOutput.contains(right)
-          } else if (isTargetPrimaryKey(right, primaryKey)) {
-            targetOutput.contains(left)
-          } else {
-            false
-          }
-        case _ => false
-      }
-    }
-
     // Check whether there are enough `EqualTo` expressions related to all the 
primary keys.
     lazy val isMergeConditionValid = {
       val mergeExpressions = splitConjunctivePredicates(mergeCondition)
       primaryKeys.forall {
-        primaryKey => existsPrimaryKeyEqualToExpression(mergeExpressions, 
primaryKey)
+        primaryKey => isUpdateExpressionToPrimaryKey(targetOutput, 
mergeExpressions, primaryKey)
       }
     }
 
-    // Check whether there are on `EqualTo` expression related to any primary 
key.
-    // Then, we do not update primary key columns.
+    // Check whether there are an update expression related to any primary key.
     def isUpdateActionValid(action: UpdateAction): Boolean = {
-      val found = primaryKeys.find {
-        primaryKey =>
-          existsPrimaryKeyEqualToExpression(
-            action.assignments.map(a => EqualTo(a.key, a.value)),
-            primaryKey)
-      }
-      found.isEmpty
+      validUpdateAssignment(targetOutput, primaryKeys, action.assignments)
     }
 
     val valid = isMergeConditionValid || actions.forall(isUpdateActionValid)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
index 5331a9c40..366146c7a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
@@ -22,7 +22,7 @@ import 
org.apache.spark.sql.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, InsertStarAction, LogicalPlan, MergeAction, MergeIntoTable, 
UpdateAction, UpdateStarAction}
 
 /** Resolve all the expressions for MergeInto. */
-object PaimonMergeIntoResolver extends AnalysisHelper with ExpressionHelper {
+object PaimonMergeIntoResolver extends ExpressionHelper {
 
   def apply(merge: MergeIntoTable, spark: SparkSession): LogicalPlan = {
     val target = merge.targetTable
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnalysisHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonRelation.scala
similarity index 78%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnalysisHelper.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonRelation.scala
index 2ac7c3734..040a1aac5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnalysisHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonRelation.scala
@@ -26,14 +26,18 @@ import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import scala.util.control.NonFatal
 
 /** An analysis helper */
-trait AnalysisHelper extends Logging {
+object PaimonRelation extends Logging {
+
+  def unapply(plan: LogicalPlan): Option[SparkTable] =
+    EliminateSubqueryAliases(plan) match {
+      case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
+      case ResolvedTable(_, _, table: SparkTable, _) => Some(table)
+      case _ => None
+    }
 
   def isPaimonTable(plan: LogicalPlan): Boolean = {
     try {
-      EliminateSubqueryAliases(plan) match {
-        case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
-        case _ => false
-      }
+      PaimonRelation.unapply(plan).nonEmpty
     } catch {
       case NonFatal(e) =>
         logWarning("Can't check if this plan is a paimon table", e)
@@ -41,7 +45,7 @@ trait AnalysisHelper extends Logging {
     }
   }
 
-  def getPaimonTableRelation(plan: LogicalPlan): DataSourceV2Relation = {
+  def getPaimonRelation(plan: LogicalPlan): DataSourceV2Relation = {
     EliminateSubqueryAliases(plan) match {
       case d @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => d
       case _ => throw new RuntimeException(s"It's not a paimon table, $plan")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonUpdateTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonUpdateTable.scala
new file mode 100644
index 000000000..df4bcf59a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonUpdateTable.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper {
+
+  override val operation: RowLevelOp = Update
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    plan.resolveOperators {
+      case u @ UpdateTable(PaimonRelation(table), assignments, _) if 
u.resolved =>
+        checkPaimonTable(table)
+
+        val primaryKeys = 
table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
+        if (!validUpdateAssignment(u.table.outputSet, primaryKeys, 
assignments)) {
+          throw new RuntimeException("Can't update the primary key column.")
+        }
+
+        UpdatePaimonTableCommand(u)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommands.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommands.scala
deleted file mode 100644
index b5cf04c49..000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommands.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.catalyst.analysis
-
-import org.apache.paimon.CoreOptions.MERGE_ENGINE
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.table.Table
-
-import org.apache.spark.sql.{AnalysisException, Delete, RowLevelOp, Update}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
PredicateHelper, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.SupportsDelete
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-
-import java.util
-
-object RewriteRowLevelCommands extends Rule[LogicalPlan] with PredicateHelper {
-
-  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
-    case d @ DeleteFromTable(ResolvesToPaimonTable(table), condition) =>
-      validateRowLevelOp(Delete, table.getTable, Option.empty)
-      if (canDeleteWhere(d, table, condition)) {
-        d
-      } else {
-        DeleteFromPaimonTableCommand(d)
-      }
-
-    case u @ UpdateTable(ResolvesToPaimonTable(table), assignments, _) =>
-      validateRowLevelOp(Update, table.getTable, Option.apply(assignments))
-      UpdatePaimonTableCommand(u)
-  }
-
-  private object ResolvesToPaimonTable {
-    def unapply(plan: LogicalPlan): Option[SparkTable] =
-      EliminateSubqueryAliases(plan) match {
-        case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
-        case _ => None
-      }
-  }
-
-  private def validateRowLevelOp(
-      op: RowLevelOp,
-      table: Table,
-      assignments: Option[Seq[Assignment]]): Unit = {
-    val options = Options.fromMap(table.options)
-    val primaryKeys = table.primaryKeys()
-    if (primaryKeys.isEmpty) {
-      throw new UnsupportedOperationException(
-        s"table ${table.getClass.getName} can not support $op, because there 
is no primary key.")
-    }
-
-    if (op.equals(Update) && isPrimaryKeyUpdate(primaryKeys, assignments.get)) 
{
-      throw new UnsupportedOperationException(s"$op to primary keys is not 
supported.")
-    }
-
-    val mergeEngine = options.get(MERGE_ENGINE)
-    if (!op.supportedMergeEngine.contains(mergeEngine)) {
-      throw new UnsupportedOperationException(
-        s"merge engine $mergeEngine can not support $op, currently only 
${op.supportedMergeEngine
-            .mkString(", ")} can support $op.")
-    }
-  }
-
-  private def canDeleteWhere(
-      d: DeleteFromTable,
-      table: SparkTable,
-      condition: Expression): Boolean = {
-    table match {
-      case t: SupportsDelete if !SubqueryExpression.hasSubquery(condition) =>
-        // fail if any filter cannot be converted.
-        // correctness depends on removing all matching data.
-        val filters = DataSourceStrategy
-          .normalizeExprs(Seq(condition), d.output)
-          .flatMap(splitConjunctivePredicates(_).map {
-            f =>
-              DataSourceStrategy
-                .translateFilter(f, supportNestedPredicatePushdown = true)
-                .getOrElse(throw new AnalysisException(s"Exec update failed:" +
-                  s" cannot translate expression to source filter: $f"))
-          })
-          .toArray
-        t.canDeleteWhere(filters)
-      case _ => false
-    }
-  }
-
-  private def isPrimaryKeyUpdate(
-      primaryKeys: util.List[String],
-      assignments: Seq[Assignment]): Boolean = {
-    assignments.exists(
-      a => {
-        a.key match {
-          case attr: Attribute => primaryKeys.contains(attr.name)
-          case _ => false
-        }
-      })
-  }
-
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelHelper.scala
new file mode 100644
index 000000000..2f5b6574c
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelHelper.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.paimon.CoreOptions.MERGE_ENGINE
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.{FileStoreTable, PrimaryKeyFileStoreTable, 
Table}
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+
+import scala.collection.JavaConverters._
+
+trait RowLevelHelper extends SQLConfHelper {
+
+  val operation: RowLevelOp
+
+  protected def checkPaimonTable(table: SparkTable): Unit = {
+    val paimonTable = table.getTable match {
+      case pkTable: PrimaryKeyFileStoreTable => pkTable
+      case _: FileStoreTable =>
+        throw new UnsupportedOperationException(
+          s"Only support to $operation table with primary keys.")
+      case _ =>
+        throw new UnsupportedOperationException(s"Can't $operation a non-file 
store table.")
+    }
+
+    val options = Options.fromMap(paimonTable.options)
+    val mergeEngine = options.get(MERGE_ENGINE)
+    if (!operation.supportedMergeEngine.contains(mergeEngine)) {
+      throw new UnsupportedOperationException(
+        s"merge engine $mergeEngine can not support $operation, currently only 
${operation.supportedMergeEngine
+            .mkString(", ")} can support $operation.")
+    }
+  }
+
+  protected def checkSubquery(condition: Expression): Unit = {
+    if (SubqueryExpression.hasSubquery(condition)) {
+      throw new RuntimeException(
+        s"Subqueries are not supported in $condition operation (condition = 
$condition).")
+    }
+  }
+
+  protected def validUpdateAssignment(
+      output: AttributeSet,
+      primaryKeys: Seq[String],
+      assignments: Seq[Assignment]): Boolean = {
+    !primaryKeys.exists {
+      primaryKey => isUpdateExpressionToPrimaryKey(output, assignments, 
primaryKey)
+    }
+  }
+
+  // Check whether there is an update expression related to primary key.
+  protected def isUpdateExpressionToPrimaryKey(
+      output: AttributeSet,
+      expressions: Seq[Expression],
+      primaryKey: String): Boolean = {
+    val resolver = conf.resolver
+
+    // Check whether this attribute is same to primary key and is from target 
table.
+    def isTargetPrimaryKey(attr: AttributeReference): Boolean = {
+      resolver(primaryKey, attr.name) && output.contains(attr)
+    }
+
+    expressions
+      .filterNot {
+        case BinaryExpression(left, right) => left == right
+        case Assignment(key, value) => key == value
+      }
+      .exists {
+        case EqualTo(left: AttributeReference, _) =>
+          isTargetPrimaryKey(left)
+        case Assignment(key: AttributeReference, _) =>
+          isTargetPrimaryKey(key)
+        case _ => false
+      }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelOp.scala
similarity index 83%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelOp.scala
index 620ab814a..0c5179b34 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelOp.scala
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql
+package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.paimon.CoreOptions.MergeEngine
 
@@ -35,3 +35,10 @@ case object Update extends RowLevelOp {
   override val supportedMergeEngine: Seq[MergeEngine] =
     Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
 }
+
+case object MergeInto extends RowLevelOp {
+  override def toString: String = "merge into"
+
+  override val supportedMergeEngine: Seq[MergeEngine] =
+    Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
deleted file mode 100644
index 92833cf1a..000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.catalyst.plans.logical
-
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
-import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.types.RowKind
-
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.functions.lit
-
-case class DeleteFromPaimonTableCommand(d: DeleteFromTable) extends 
LeafRunnableCommand {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    val relation = 
EliminateSubqueryAliases(d.table).asInstanceOf[DataSourceV2Relation]
-    val condition = d.condition
-
-    val filteredPlan = if (condition != null) {
-      Filter(condition, relation)
-    } else {
-      relation
-    }
-
-    val df = Dataset
-      .ofRows(sparkSession, filteredPlan)
-      .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
-
-    WriteIntoPaimonTable(
-      
relation.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable],
-      InsertInto,
-      df,
-      Options.fromMap(relation.options)).run(sparkSession)
-
-    Seq.empty[Row]
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 7623e08ab..e121027b2 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.PaimonSparkTestBase
 
-import org.apache.spark.sql.Delete
+import org.apache.spark.sql.catalyst.analysis.Delete
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 
 class DeleteFromTableTest extends PaimonSparkTestBase {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index 52f83d757..603262d58 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.PaimonSparkTestBase
 
-import org.apache.spark.sql.Update
+import org.apache.spark.sql.catalyst.analysis.Update
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 
 class UpdateTableTest extends PaimonSparkTestBase {
@@ -33,7 +33,7 @@ class UpdateTableTest extends PaimonSparkTestBase {
     spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
 
     assertThatThrownBy(() => spark.sql("UPDATE T SET name = 'a_new' WHERE id = 
1"))
-      .hasMessageContaining("can not support update, because there is no 
primary key")
+      .hasMessageContaining("Only support to update table with primary keys.")
   }
 
   CoreOptions.MergeEngine.values().foreach {
@@ -72,7 +72,7 @@ class UpdateTableTest extends PaimonSparkTestBase {
     spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c', 
'33')")
 
     assertThatThrownBy(() => spark.sql("UPDATE T SET id = 11 WHERE name = 
'a'"))
-      .hasMessageContaining("update to primary keys is not supported")
+      .hasMessageContaining("Can't update the primary key column.")
   }
 
   test(s"test update with no where") {

Reply via email to