This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f2327257b [spark] Support UPDATE for append table (#3129)
f2327257b is described below

commit f2327257bb1410c7900ea01130052c7773e86614
Author: Yann Byron <[email protected]>
AuthorDate: Mon Apr 1 20:41:59 2024 +0800

    [spark] Support UPDATE for append table (#3129)
---
 .../apache/paimon/utils/FileStorePathFactory.java  |  12 +-
 .../paimon/spark/procedure/CompactProcedure.java   |   8 +-
 ...hFileStoreTable.scala => PaimonSplitScan.scala} |  17 ++-
 .../catalyst/analysis/PaimonDeleteTable.scala      |   2 +-
 .../catalyst/analysis/PaimonMergeIntoBase.scala    |   2 +-
 .../catalyst/analysis/PaimonUpdateTable.scala      |  38 +++--
 .../spark/catalyst/analysis/RowLevelHelper.scala   |  23 +--
 .../spark/catalyst/analysis/RowLevelOp.scala       |  36 ++++-
 .../analysis/expressions/ExpressionHelper.scala    |  53 ++++---
 .../analysis/expressions/ExpressionUtils.scala}    |  11 +-
 .../commands/DeleteFromPaimonTableCommand.scala    |   1 -
 .../paimon/spark/commands/PaimonCommand.scala      |   4 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  17 ++-
 .../paimon/spark/commands/SparkDataFileMeta.scala  |  64 ++++++++
 .../spark/commands/UpdatePaimonTableCommand.scala  | 168 ++++++++++++++++++---
 .../paimon/spark/commands/WithFileStoreTable.scala |   4 +
 .../spark/commands/WriteIntoPaimonTable.scala      |  17 +--
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  |   2 +-
 .../apache/paimon/spark/sql/UpdateTableTest.scala  |  79 +++++++++-
 19 files changed, 437 insertions(+), 121 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 903a814e1..0f3ad7fec 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -101,8 +101,16 @@ public class FileStorePathFactory {
     }
 
     public Path bucketPath(BinaryRow partition, int bucket) {
-        return new Path(
-                root + "/" + getPartitionString(partition) + "/" + 
BUCKET_PATH_PREFIX + bucket);
+        return new Path(root + "/" + relativePartitionAndBucketPath(partition, 
bucket));
+    }
+
+    public Path relativePartitionAndBucketPath(BinaryRow partition, int 
bucket) {
+        String partitionPath = getPartitionString(partition);
+        if (partitionPath.isEmpty()) {
+            return new Path(BUCKET_PATH_PREFIX + bucket);
+        } else {
+            return new Path(getPartitionString(partition) + "/" + 
BUCKET_PATH_PREFIX + bucket);
+        }
     }
 
     /** IMPORTANT: This method is NOT THREAD SAFE. */
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 111e9f75c..639633644 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -28,7 +28,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.spark.DynamicOverWrite$;
 import org.apache.paimon.spark.SparkUtils;
-import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper;
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
 import org.apache.paimon.spark.sort.TableSorter;
 import org.apache.paimon.table.BucketMode;
@@ -143,9 +143,9 @@ public class CompactProcedure extends BaseProcedure {
                     LogicalPlan relation = createRelation(tableIdent);
                     Expression condition = null;
                     if (!StringUtils.isBlank(finalWhere)) {
-                        condition = ExpressionHelper.resolveFilter(spark(), 
relation, finalWhere);
+                        condition = ExpressionUtils.resolveFilter(spark(), 
relation, finalWhere);
                         checkArgument(
-                                ExpressionHelper.onlyHasPartitionPredicate(
+                                ExpressionUtils.isValidPredicate(
                                         spark(),
                                         condition,
                                         table.partitionKeys().toArray(new 
String[0])),
@@ -188,7 +188,7 @@ public class CompactProcedure extends BaseProcedure {
             Predicate filter =
                     condition == null
                             ? null
-                            : 
ExpressionHelper.convertConditionToPaimonPredicate(
+                            : 
ExpressionUtils.convertConditionToPaimonPredicate(
                                     condition, relation.output(), 
table.rowType());
             switch (bucketMode) {
                 case FIXED:
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
similarity index 59%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 55be49498..e86f4caf6 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -16,12 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark
 
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.Table
+import org.apache.paimon.table.source.{DataSplit, Split}
 
-private[spark] trait WithFileStoreTable {
+import org.apache.spark.sql.connector.read.{Batch, Scan}
+import org.apache.spark.sql.types.StructType
 
-  def table: FileStoreTable
+/** For internal use only. */
+case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends 
Scan {
+
+  override def readSchema(): StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType())
+
+  override def toBatch: Batch = {
+    PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder)
+  }
 
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 8ef9de1cc..f2800d742 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -30,7 +30,7 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with 
RowLevelHelper {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.resolveOperators {
       case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved 
=>
-        checkPaimonTable(table)
+        checkPaimonTable(table.getTable)
 
         DeleteFromPaimonTableCommand(table, d)
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index f95185699..c07b58399 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -46,7 +46,7 @@ trait PaimonMergeIntoBase
         val v2Table = relation.table.asInstanceOf[SparkTable]
         val targetOutput = relation.output
 
-        checkPaimonTable(v2Table)
+        checkPaimonTable(v2Table.getTable)
         checkCondition(merge.mergeCondition)
         merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
         merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
index 940cb0e14..e369c46e2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonUpdateTable.scala
@@ -18,27 +18,47 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.CoreOptions
 import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
 
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 
-object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper {
+import scala.collection.JavaConverters._
+
+object PaimonUpdateTable
+  extends Rule[LogicalPlan]
+  with RowLevelHelper
+  with AssignmentAlignmentHelper {
 
   override val operation: RowLevelOp = Update
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.resolveOperators {
-      case u @ UpdateTable(PaimonRelation(table), assignments, _) if 
u.resolved =>
-        checkPaimonTable(table)
+      case u @ UpdateTable(PaimonRelation(table), assignments, condition) if 
u.resolved =>
+        checkPaimonTable(table.getTable)
 
-        val primaryKeys = 
table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
-        if (!validUpdateAssignment(u.table.outputSet, primaryKeys, 
assignments)) {
-          throw new RuntimeException("Can't update the primary key column.")
-        }
+        table.getTable match {
+          case paimonTable: FileStoreTable =>
+            val primaryKeys = paimonTable.primaryKeys().asScala
+            if (primaryKeys.isEmpty) {
+              condition.foreach(checkSubquery)
+            }
+            if (!validUpdateAssignment(u.table.outputSet, primaryKeys, 
assignments)) {
+              throw new RuntimeException("Can't update the primary key 
column.")
+            }
 
-        UpdatePaimonTableCommand(u)
+            val relation = PaimonRelation.getPaimonRelation(u.table)
+            UpdatePaimonTableCommand(
+              relation,
+              paimonTable,
+              condition.getOrElse(TrueLiteral),
+              assignments)
+
+          case _ =>
+            throw new RuntimeException("Update Operation is only supported for 
FileStoreTable.")
+        }
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
index 659d84dab..9981e7d3c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelHelper.scala
@@ -18,33 +18,18 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.CoreOptions.MERGE_ENGINE
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression, 
SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
AttributeSet, BinaryExpression, EqualTo, Expression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.Assignment
 
 trait RowLevelHelper extends SQLConfHelper {
 
   val operation: RowLevelOp
 
-  protected def checkPaimonTable(table: SparkTable): Unit = {
-    val paimonTable = if (table.getTable.primaryKeys().size() > 0) {
-      table.getTable
-    } else {
-      throw new UnsupportedOperationException(
-        s"Only support to $operation table with primary keys.")
-    }
-
-    val options = Options.fromMap(paimonTable.options)
-    val mergeEngine = options.get(MERGE_ENGINE)
-    if (!operation.supportedMergeEngine.contains(mergeEngine)) {
-      throw new UnsupportedOperationException(
-        s"merge engine $mergeEngine can not support $operation, currently only 
${operation.supportedMergeEngine
-            .mkString(", ")} can support $operation.")
-    }
+  protected def checkPaimonTable(table: Table): Unit = {
+    operation.checkValidity(table)
   }
 
   protected def checkSubquery(condition: Expression): Unit = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
index 3d4fe0889..f83cf91d8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
@@ -18,28 +18,54 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.CoreOptions.MergeEngine
+import org.apache.paimon.CoreOptions.{MERGE_ENGINE, MergeEngine}
+import org.apache.paimon.options.Options
+import org.apache.paimon.table.Table
 
 sealed trait RowLevelOp {
-  val supportedMergeEngine: Seq[MergeEngine]
+
+  val name: String = this.getClass.getSimpleName.stripSuffix("$")
+
+  protected val supportedMergeEngine: Seq[MergeEngine]
+
+  protected val supportAppendOnlyTable: Boolean
+
+  def checkValidity(table: Table): Unit = {
+    if (!supportAppendOnlyTable && table.primaryKeys().isEmpty) {
+      throw new UnsupportedOperationException(s"Only support to $name table 
with primary keys.")
+    }
+
+    val mergeEngine = Options.fromMap(table.options).get(MERGE_ENGINE)
+    if (!supportedMergeEngine.contains(mergeEngine)) {
+      throw new UnsupportedOperationException(
+        s"merge engine $mergeEngine can not support $name, currently only 
${supportedMergeEngine
+            .mkString(", ")} can support $name.")
+    }
+  }
 }
 
 case object Delete extends RowLevelOp {
-  override def toString: String = "delete"
 
   override val supportedMergeEngine: Seq[MergeEngine] = 
Seq(MergeEngine.DEDUPLICATE)
+
+  override val supportAppendOnlyTable: Boolean = false
+
 }
 
 case object Update extends RowLevelOp {
-  override def toString: String = "update"
 
   override val supportedMergeEngine: Seq[MergeEngine] =
     Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+
+  override val supportAppendOnlyTable: Boolean = true
+
 }
 
 case object MergeInto extends RowLevelOp {
-  override def toString: String = "merge into"
 
   override val supportedMergeEngine: Seq[MergeEngine] =
     Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+
+  override val supportAppendOnlyTable: Boolean = false
+
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 65f2a04bd..3e09557d5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -24,6 +24,7 @@ import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, Cast, 
Expression, GetStructField, Literal, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.internal.SQLConf
@@ -100,35 +101,37 @@ trait ExpressionHelper extends PredicateHelper {
       throw new UnsupportedOperationException(
         s"Unsupported update expression: $other, only support update with 
PrimitiveType and StructType.")
   }
-}
-
-object ExpressionHelper {
-
-  case class FakeLogicalPlan(exprs: Seq[Expression], children: 
Seq[LogicalPlan])
-    extends LogicalPlan {
-    override def output: Seq[Attribute] = Nil
 
-    override protected def withNewChildrenInternal(
-        newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children 
= newChildren)
-  }
-
-  def resolveFilter(spark: SparkSession, plan: LogicalPlan, where: String): 
Expression = {
-    val unResolvedExpression = 
spark.sessionState.sqlParser.parseExpression(where)
+  def resolveFilter(spark: SparkSession, plan: LogicalPlan, conditionSql: 
String): Expression = {
+    val unResolvedExpression = 
spark.sessionState.sqlParser.parseExpression(conditionSql)
     val filter = Filter(unResolvedExpression, plan)
     spark.sessionState.analyzer.execute(filter) match {
       case filter: Filter => filter.condition
-      case _ => throw new RuntimeException(s"Could not resolve expression 
$where in plan: $plan")
+      case _ =>
+        throw new RuntimeException(s"Could not resolve expression 
$conditionSql in plan: $plan")
     }
   }
 
-  def onlyHasPartitionPredicate(
+  def isPredicatePartitionColumnsOnly(
+      condition: Expression,
+      partitionColumns: Seq[String],
+      resolver: Resolver
+  ): Boolean = {
+    condition.references.forall(r => partitionColumns.exists(resolver(r.name, 
_)))
+  }
+
+  /**
+   * A valid predicate should meet two requirements: 1) This predicate only 
contains partition
+   * columns. 2) This predicate doesn't contain subquery.
+   */
+  def isValidPredicate(
       spark: SparkSession,
       expr: Expression,
       partitionCols: Array[String]): Boolean = {
-    val resolvedNameEquals = spark.sessionState.analyzer.resolver
+    val resolver = spark.sessionState.analyzer.resolver
     splitConjunctivePredicates(expr).forall(
       e =>
-        e.references.forall(r => 
partitionCols.exists(resolvedNameEquals(r.name, _))) &&
+        isPredicatePartitionColumnsOnly(e, partitionCols, resolver) &&
           !SubqueryExpression.hasSubquery(expr))
   }
 
@@ -148,12 +151,16 @@ object ExpressionHelper {
     val predicates = filters.map(converter.convert)
     PredicateBuilder.and(predicates: _*)
   }
+}
 
-  def splitConjunctivePredicates(condition: Expression): Seq[Expression] = {
-    condition match {
-      case And(cond1, cond2) =>
-        splitConjunctivePredicates(cond1) ++ splitConjunctivePredicates(cond2)
-      case other => other :: Nil
-    }
+object ExpressionHelper {
+
+  case class FakeLogicalPlan(exprs: Seq[Expression], children: 
Seq[LogicalPlan])
+    extends LogicalPlan {
+    override def output: Seq[Attribute] = Nil
+
+    override protected def withNewChildrenInternal(
+        newChildren: IndexedSeq[LogicalPlan]): FakeLogicalPlan = copy(children 
= newChildren)
   }
+
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala
similarity index 82%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala
index 55be49498..b0c97c620 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionUtils.scala
@@ -16,12 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.commands
+package org.apache.paimon.spark.catalyst.analysis.expressions
 
-import org.apache.paimon.table.FileStoreTable
-
-private[spark] trait WithFileStoreTable {
-
-  def table: FileStoreTable
-
-}
+/** This wrapper is only used in java code, e.g. Procedure. */
+object ExpressionUtils extends ExpressionHelper
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 9f79664be..e4bf22d0f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -21,7 +21,6 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.options.Options
 import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
 import org.apache.paimon.spark.{InsertInto, SparkTable}
-import 
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper.convertConditionToPaimonPredicate
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 02a0e0cc2..ba404704b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -19,13 +19,13 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.spark.SparkFilterConverter
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
 
 /** Helper trait for all paimon commands. */
-trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
+trait PaimonCommand extends WithFileStoreTable with ExpressionHelper {
 
   /**
    * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index edccb4989..da269d486 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -35,7 +35,7 @@ import java.io.IOException
 
 import scala.collection.JavaConverters._
 
-trait PaimonSparkWriter extends WithFileStoreTable {
+case class PaimonSparkWriter(table: FileStoreTable) {
 
   private lazy val tableSchema = table.schema
 
@@ -52,7 +52,9 @@ trait PaimonSparkWriter extends WithFileStoreTable {
 
   private lazy val serializer = new CommitMessageSerializer
 
-  def write(data: Dataset[_], writeBuilder: BatchWriteBuilder): 
Seq[CommitMessage] = {
+  val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
+
+  def write(data: Dataset[_]): Seq[CommitMessage] = {
     val sparkSession = data.sparkSession
     import sparkSession.implicits._
 
@@ -101,6 +103,17 @@ trait PaimonSparkWriter extends WithFileStoreTable {
     commitMessages.toSeq
   }
 
+  def commit(commitMessages: Seq[CommitMessage]): Unit = {
+    val tableCommit = writeBuilder.newCommit()
+    try {
+      tableCommit.commit(commitMessages.toList.asJava)
+    } catch {
+      case e: Throwable => throw new RuntimeException(e);
+    } finally {
+      tableCommit.close()
+    }
+  }
+
   /** assign a valid bucket id for each of record. */
   private def assignBucketId(
       sparkSession: SparkSession,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
new file mode 100644
index 000000000..cb3266157
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.utils.FileStorePathFactory
+
+import scala.collection.JavaConverters._
+
+case class SparkDataFileMeta(
+    partition: BinaryRow,
+    bucket: Int,
+    totalBuckets: Int,
+    dataFileMeta: DataFileMeta) {
+
+  def relativePath(fileStorePathFactory: FileStorePathFactory): String = {
+    fileStorePathFactory
+      .relativePartitionAndBucketPath(partition, bucket)
+      .toUri
+      .toString + "/" + dataFileMeta.fileName()
+  }
+}
+
+object SparkDataFileMeta {
+  def convertToSparkDataFileMeta(
+      dataSplit: DataSplit,
+      totalBuckets: Int): Seq[SparkDataFileMeta] = {
+    dataSplit.dataFiles().asScala.map {
+      file => SparkDataFileMeta(dataSplit.partition, dataSplit.bucket, 
totalBuckets, file)
+    }
+  }
+
+  def convertToDataSplits(sparkDataFiles: Array[SparkDataFileMeta]): 
Array[DataSplit] = {
+    sparkDataFiles
+      .groupBy(file => (file.partition, file.bucket))
+      .map {
+        case ((partition, bucket), files) =>
+          new DataSplit.Builder()
+            .withPartition(partition)
+            .withBucket(bucket)
+            .withDataFiles(files.map(_.dataFileMeta).toList.asJava)
+            .build()
+      }
+      .toArray
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 45ef870f2..3bc236a86 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -18,45 +18,169 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.catalyst.analysis.{AssignmentAlignmentHelper, 
PaimonRelation}
+import org.apache.paimon.index.IndexFileMeta
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, 
IndexIncrement}
+import org.apache.paimon.spark.PaimonSplitScan
+import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
+import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
+import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowKind
 
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{Column, Row, SparkSession}
 import org.apache.spark.sql.Utils.createDataset
-import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, 
UpdateTable}
-import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
Project, SupportsSubquery}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
+import org.apache.spark.sql.functions.{input_file_name, lit}
 
-case class UpdatePaimonTableCommand(u: UpdateTable)
+import java.net.URI
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+case class UpdatePaimonTableCommand(
+    relation: DataSourceV2Relation,
+    override val table: FileStoreTable,
+    condition: Expression,
+    assignments: Seq[Assignment])
   extends PaimonLeafRunnableCommand
-  with AssignmentAlignmentHelper {
+  with PaimonCommand
+  with AssignmentAlignmentHelper
+  with SupportsSubquery {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  private lazy val writer = PaimonSparkWriter(table)
 
-    val relation = PaimonRelation.getPaimonRelation(u.table)
+  private lazy val updateExpressions = {
+    generateAlignedExpressions(relation.output, 
assignments).zip(relation.output).map {
+      case (expr, attr) => Alias(expr, attr.name)()
+    }
+  }
 
-    val updatedExprs: Seq[Alias] =
-      generateAlignedExpressions(relation.output, 
u.assignments).zip(relation.output).map {
-        case (expr, attr) => Alias(expr, attr.name)()
-      }
+  override def run(sparkSession: SparkSession): Seq[Row] = {
 
-    val updatedPlan = Project(updatedExprs, 
Filter(u.condition.getOrElse(TrueLiteral), relation))
+    val commitMessages = if (withPrimaryKeys) {
+      performUpdateForPkTable(sparkSession)
+    } else {
+      performUpdateForNonPkTable(sparkSession)
+    }
+    writer.commit(commitMessages)
 
+    Seq.empty[Row]
+  }
+
+  /** Update for table with primary keys */
+  private def performUpdateForPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
+    val updatedPlan = Project(updateExpressions, Filter(condition, relation))
     val df = createDataset(sparkSession, updatedPlan)
       .withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue))
+    writer.write(df)
+  }
 
-    WriteIntoPaimonTable(
-      
relation.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable],
-      InsertInto,
-      df,
-      Options.fromMap(relation.options)).run(sparkSession)
+  /** Update for table without primary keys */
+  private def performUpdateForNonPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
+    // Step1: the candidate data splits which are filtered by Paimon Predicate.
+    val candidateDataSplits = findCandidateDataSplits()
 
-    Seq.empty[Row]
+    val commitMessages = if (candidateDataSplits.isEmpty) {
+      // no data spilt need to be rewrote
+      logDebug("No file need to rerote. It's an empty Commit.")
+      Seq.empty[CommitMessage]
+    } else {
+      import sparkSession.implicits._
+
+      // Step2: extract out the exactly files, which must contain record to be 
updated.
+      val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
+      val filteredRelation =
+        Filter(condition, DataSourceV2ScanRelation(relation, scan, 
relation.output))
+      val touchedFilePaths = createDataset(sparkSession, filteredRelation)
+        .select(input_file_name())
+        .distinct()
+        .as[String]
+        .collect()
+        .map(relativePath)
+
+      // Step3: build a new list of data splits which compose of those files.
+      // Those are expected to be the smallest range of data files that need 
to be rewritten.
+      val totalBuckets = table.coreOptions().bucket()
+      val candidateDataFiles = candidateDataSplits
+        .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, 
totalBuckets))
+      val fileStorePathFactory = table.store().pathFactory()
+      val fileNameToMeta =
+        candidateDataFiles
+          .map(file => (file.relativePath(fileStorePathFactory), file))
+          .toMap
+      val touchedFiles: Array[SparkDataFileMeta] = touchedFilePaths.map {
+        file => fileNameToMeta.getOrElse(file, throw new 
RuntimeException(s"Missing file: $file"))
+      }
+      val touchedDataSplits = 
SparkDataFileMeta.convertToDataSplits(touchedFiles)
+
+      // Step4: build a dataframe that contains the unchanged and updated 
data, and write out them.
+      val columns = updateExpressions.zip(relation.output).map {
+        case (update, origin) =>
+          val updated = if (condition == TrueLiteral) {
+            update
+          } else {
+            If(condition, update, origin)
+          }
+          new Column(updated).as(origin.name, origin.metadata)
+      }
+      val toUpdateScanRelation = DataSourceV2ScanRelation(
+        relation,
+        PaimonSplitScan(table, touchedDataSplits),
+        relation.output)
+      val data = createDataset(sparkSession, 
toUpdateScanRelation).select(columns: _*)
+      val addCommitMessage = writer.write(data)
+
+      // Step5: convert the files that need to be wrote to commit message.
+      val deletedCommitMessage = touchedFiles
+        .groupBy(f => (f.partition, f.bucket))
+        .map {
+          case ((partition, bucket), files) =>
+            val bb = files.map(_.dataFileMeta).toList.asJava
+            val newFilesIncrement = new DataIncrement(
+              Collections.emptyList[DataFileMeta],
+              bb,
+              Collections.emptyList[DataFileMeta])
+            buildCommitMessage(
+              new CommitMessageImpl(partition, bucket, newFilesIncrement, 
null, null))
+        }
+        .toSeq
+
+      addCommitMessage ++ deletedCommitMessage
+    }
+    commitMessages
+  }
+
+  private def findCandidateDataSplits(): Seq[DataSplit] = {
+    val snapshotReader = table.newSnapshotReader()
+    if (condition == TrueLiteral) {
+      val filter = convertConditionToPaimonPredicate(condition, 
relation.output, rowType)
+      snapshotReader.withFilter(filter)
+    }
+
+    snapshotReader.read().splits().asScala.collect { case s: DataSplit => s }
+  }
+
+  /** Gets a relative path against the table path. */
+  private def relativePath(absolutePath: String): String = {
+    val location = table.location().toUri
+    location.relativize(new URI(absolutePath)).toString
+  }
+
+  private def buildCommitMessage(o: CommitMessageImpl): CommitMessage = {
+    new CommitMessageImpl(
+      o.partition,
+      o.bucket,
+      o.newFilesIncrement,
+      new CompactIncrement(
+        Collections.emptyList[DataFileMeta],
+        Collections.emptyList[DataFileMeta],
+        Collections.emptyList[DataFileMeta]),
+      new IndexIncrement(Collections.emptyList[IndexFileMeta]));
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
index 55be49498..1d447281e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
@@ -19,9 +19,13 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.RowType
 
 private[spark] trait WithFileStoreTable {
 
   def table: FileStoreTable
 
+  def withPrimaryKeys: Boolean = !table.primaryKeys().isEmpty
+
+  def rowType: RowType = table.rowType()
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index b3077eb8a..905c9cdfb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -39,7 +39,6 @@ case class WriteIntoPaimonTable(
     options: Options)
   extends RunnableCommand
   with PaimonCommand
-  with PaimonSparkWriter
   with SchemaHelper
   with Logging {
 
@@ -57,20 +56,12 @@ case class WriteIntoPaimonTable(
     updateTableWithOptions(
       Map(DYNAMIC_PARTITION_OVERWRITE.key -> 
dynamicPartitionOverwriteMode.toString))
 
-    val writeBuilder = table.newBatchWriteBuilder()
+    val writer = PaimonSparkWriter(table)
     if (overwritePartition != null) {
-      writeBuilder.withOverwrite(overwritePartition.asJava)
-    }
-
-    val commitMessages = write(data, writeBuilder)
-    val tableCommit = writeBuilder.newCommit()
-    try {
-      tableCommit.commit(commitMessages.toList.asJava)
-    } catch {
-      case e: Throwable => throw new RuntimeException(e);
-    } finally {
-      tableCommit.close()
+      writer.writeBuilder.withOverwrite(overwritePartition.asJava)
     }
+    val commitMessages = writer.write(data)
+    writer.commit(commitMessages)
 
     Seq.empty
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index a853bcb3b..bf8e72f79 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -618,7 +618,7 @@ abstract class MergeIntoTableTestBase extends 
PaimonSparkTestBase {
                      |THEN INSERT (a, b, c) values (a, b, c)
                      |""".stripMargin)
       }.getMessage
-      assert(error.contains("Only support to merge into table with primary 
keys."))
+      assert(error.contains("Only support to MergeInto table with primary 
keys."))
     }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index 65261f3de..cc95e7a90 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -26,15 +26,86 @@ import org.assertj.core.api.Assertions.{assertThat, 
assertThatThrownBy}
 
 class UpdateTableTest extends PaimonSparkTestBase {
 
-  test(s"test update append only table") {
+  import testImplicits._
+
+  test(s"Paimon Update: append-only table") {
     spark.sql(s"""
                  |CREATE TABLE T (id INT, name STRING, dt STRING)
                  |""".stripMargin)
 
-    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), 
(4, 'd', '2025')
+                |""".stripMargin)
+
+    spark.sql("UPDATE T SET name = 'a_new' WHERE id = 1")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a_new", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", 
"2025")).toDF()
+    )
+
+    val snapshotManager = loadTable("T").snapshotManager()
+    var lastSnapshotId = snapshotManager.latestSnapshotId()
+    spark.sql("UPDATE T SET name = concat(name, '2') WHERE id % 2 == 0")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a_new", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d2", 
"2025")).toDF()
+    )
+    assertThat(lastSnapshotId + 
1).isEqualTo(snapshotManager.latestSnapshotId())
+
+    lastSnapshotId = snapshotManager.latestSnapshotId()
+    spark.sql("UPDATE T SET name = 'empty_commit' WHERE id > 100")
+    // no data need to be updated, it's an empty commit.
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a_new", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d2", 
"2025")).toDF()
+    )
+    assertThat(lastSnapshotId).isEqualTo(snapshotManager.latestSnapshotId())
+  }
+
+  test(s"Paimon Update: append-only table with partition") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED 
BY (dt)
+                 |""".stripMargin)
+
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), 
(4, 'd', '2025')
+                |""".stripMargin)
+
+    spark.sql("UPDATE T SET name = concat(name, '2') WHERE dt <= '2024'")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a2", "2024"), (2, "b2", "2024"), (3, "c", "2025"), (4, "d", 
"2025")).toDF()
+    )
+
+    spark.sql("UPDATE T SET name = concat(name, '3') WHERE dt = '2025' and id 
% 2 == 1")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a2", "2024"), (2, "b2", "2024"), (3, "c3", "2025"), (4, "d", 
"2025")).toDF()
+    )
+
+    spark.sql("UPDATE T SET name = concat(name, '4') WHERE id % 2 == 0")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a2", "2024"), (2, "b24", "2024"), (3, "c3", "2025"), (4, "d4", 
"2025")).toDF()
+    )
+  }
+
+  test("Paimon Update: append-only table, condition contains subquery") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED 
BY (dt)
+                 |""".stripMargin)
+
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), 
(4, 'd', '2025')
+                |""".stripMargin)
 
-    assertThatThrownBy(() => spark.sql("UPDATE T SET name = 'a_new' WHERE id = 
1"))
-      .hasMessageContaining("Only support to update table with primary keys.")
+    Seq(1, 2).toDF("id").createOrReplaceTempView("updated_ids")
+    assertThatThrownBy(
+      () => spark.sql("UPDATE T set name = 'in_new' WHERE id IN (SELECT * FROM 
updated_ids)"))
+      .hasMessageContaining("Subqueries are not supported")
   }
 
   CoreOptions.MergeEngine.values().foreach {


Reply via email to