This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 222dc8f7c [Spark] Minor Refactor Spark Commands (#2352)
222dc8f7c is described below
commit 222dc8f7c938c7995cde77b62d3507d24a4e060b
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 22 09:51:23 2023 +0800
[Spark] Minor Refactor Spark Commands (#2352)
---
.../java/org/apache/paimon/spark/SparkTable.java | 55 ----------
.../commands/DeleteFromPaimonTableCommand.scala | 88 ++++++++++++++++
.../paimon/spark/commands/PaimonCommand.scala | 23 +++-
.../PaimonDynamicPartitionOverwriteCommand.scala | 9 +-
.../commands/PaimonTruncateTableCommand.scala | 52 +++++++++
.../spark/commands}/UpdatePaimonTableCommand.scala | 12 +--
.../extensions/PaimonSparkSessionExtensions.scala | 7 +-
.../main/scala/org/apache/spark/sql/Utils.scala | 13 +++
.../sql/catalyst/analysis/PaimonAnalysis.scala | 18 +++-
.../analysis/PaimonDeleteTable.scala} | 26 ++---
.../sql/catalyst/analysis/PaimonMergeInto.scala | 70 +++---------
.../analysis/PaimonMergeIntoResolver.scala | 2 +-
.../{AnalysisHelper.scala => PaimonRelation.scala} | 16 +--
.../sql/catalyst/analysis/PaimonUpdateTable.scala | 43 ++++++++
.../analysis/RewriteRowLevelCommands.scala | 117 ---------------------
.../sql/catalyst/analysis/RowLevelHelper.scala | 95 +++++++++++++++++
.../sql/{ => catalyst/analysis}/RowLevelOp.scala | 9 +-
.../logical/DeleteFromPaimonTableCommand.scala | 57 ----------
.../paimon/spark/sql/DeleteFromTableTest.scala | 2 +-
.../apache/paimon/spark/sql/UpdateTableTest.scala | 6 +-
20 files changed, 396 insertions(+), 324 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 77090f6ab..6b21fad0a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -19,18 +19,12 @@
package org.apache.paimon.spark;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsRead;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.TableCapability;
@@ -40,28 +34,22 @@ import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
-import org.apache.spark.sql.sources.AlwaysTrue;
-import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import javax.annotation.Nullable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
/** A spark {@link org.apache.spark.sql.connector.catalog.Table} for paimon. */
public class SparkTable
implements org.apache.spark.sql.connector.catalog.Table,
SupportsRead,
SupportsWrite,
- SupportsDelete,
PaimonPartitionManagement {
private final Table table;
@@ -119,49 +107,6 @@ public class SparkTable
}
}
- public boolean canDeleteWhere(Filter[] filters) {
- SparkFilterConverter converter = new
SparkFilterConverter(table.rowType());
- List<Predicate> predicates = new ArrayList<>();
- for (Filter filter : filters) {
- if (filter.equals(new AlwaysTrue())) {
- continue;
- }
- predicates.add(converter.convert(filter));
- }
- deletePredicate = predicates.isEmpty() ? null :
PredicateBuilder.and(predicates);
- return deletePredicate == null || deleteIsDropPartition();
- }
-
- @Override
- public void deleteWhere(Filter[] filters) {
- FileStoreCommit commit =
- ((AbstractFileStoreTable)
table).store().newCommit(UUID.randomUUID().toString());
- long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
- if (deletePredicate == null) {
- commit.purgeTable(identifier);
- } else if (deleteIsDropPartition()) {
-
commit.dropPartitions(Collections.singletonList(deletePartitions()),
identifier);
- } else {
- // can't reach here
- throw new UnsupportedOperationException();
- }
- }
-
- private boolean deleteIsDropPartition() {
- return deletePredicate != null
- && deletePredicate.visit(new
OnlyPartitionKeyEqualVisitor(table.partitionKeys()));
- }
-
- private Map<String, String> deletePartitions() {
- if (deletePredicate == null) {
- return null;
- }
- OnlyPartitionKeyEqualVisitor visitor =
- new OnlyPartitionKeyEqualVisitor(table.partitionKeys());
- deletePredicate.visit(visitor);
- return visitor.partitions();
- }
-
@Override
public Map<String, String> properties() {
if (table instanceof DataTable) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
new file mode 100644
index 000000000..fa1a46333
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
+import org.apache.paimon.spark.{InsertInto, SparkTable}
+import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.BatchWriteBuilder
+import org.apache.paimon.types.RowKind
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Utils.createDataset
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, Filter,
LogicalPlan, SupportsSubquery}
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.functions.lit
+
+import java.util.{Collections, UUID}
+
+import scala.util.control.NonFatal
+
+case class DeleteFromPaimonTableCommand(v2Table: SparkTable, delete:
DeleteFromTable)
+ extends LeafRunnableCommand
+ with PaimonCommand {
+
+ override def table: FileStoreTable =
v2Table.getTable.asInstanceOf[FileStoreTable]
+
+ private val relation = delete.table
+ private val condition = delete.condition
+
+ private lazy val (deletePredicate, forceDeleteByRows) =
+ if (condition == null || condition == TrueLiteral) {
+ (None, false)
+ } else {
+ try {
+ (Some(convertConditionToPaimonPredicate(condition, relation.output)),
false)
+ } catch {
+ case NonFatal(_) =>
+ (None, true)
+ }
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val commit = table.store.newCommit(UUID.randomUUID.toString)
+
+ if (forceDeleteByRows) {
+ deleteRowsByCondition(sparkSession)
+ } else if (deletePredicate.isEmpty) {
+ commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+ } else {
+ val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
+ if (deletePredicate.get.visit(visitor)) {
+ val dropPartitions = visitor.partitions()
+ commit.dropPartitions(
+ Collections.singletonList(dropPartitions),
+ BatchWriteBuilder.COMMIT_IDENTIFIER)
+ } else {
+ deleteRowsByCondition(sparkSession)
+ }
+ }
+
+ Seq.empty[Row]
+ }
+
+ private def deleteRowsByCondition(sparkSession: SparkSession): Unit = {
+ val df = createDataset(sparkSession, Filter(condition, relation))
+ .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
+
+ WriteIntoPaimonTable(table, InsertInto, df, new
Options()).run(sparkSession)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 786afbba9..6b07856c0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -17,17 +17,22 @@
*/
package org.apache.paimon.spark.commands
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.SparkFilterConverter
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageSerializer}
import org.apache.paimon.types.RowType
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.Utils.{normalizeExprs, translateFilter}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
PredicateHelper}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
import java.io.IOException
/** Helper trait for all paimon commands. */
-trait PaimonCommand extends WithFileStoreTable {
+trait PaimonCommand extends WithFileStoreTable with PredicateHelper {
lazy val bucketMode: BucketMode = table match {
case fileStoreTable: FileStoreTable =>
@@ -47,6 +52,22 @@ trait PaimonCommand extends WithFileStoreTable {
}
}
+ protected def convertConditionToPaimonPredicate(
+ condition: Expression,
+ output: Seq[Attribute]): Predicate = {
+ val converter = new SparkFilterConverter(table.rowType)
+ val filters = normalizeExprs(Seq(condition), output)
+ .flatMap(splitConjunctivePredicates(_).map {
+ f =>
+ translateFilter(f, supportNestedPredicatePushdown = true).getOrElse(
+ throw new RuntimeException("Exec update failed:" +
+ s" cannot translate expression to source filter: $f"))
+ })
+ .toArray
+ val predicates = filters.map(converter.convert)
+ PredicateBuilder.and(predicates: _*)
+ }
+
/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
* methods where the `AlwaysTrue` Filter is used.
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PaimonDynamicPartitionOverwriteCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
similarity index 90%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PaimonDynamicPartitionOverwriteCommand.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
index 44c322ac5..b69ef6e25 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/PaimonDynamicPartitionOverwriteCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonDynamicPartitionOverwriteCommand.scala
@@ -15,15 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.spark.DynamicOverWrite
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.table.FileStoreTable
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Utils.createDataset
import org.apache.spark.sql.catalyst.analysis.NamedRelation
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.execution.command.RunnableCommand
import scala.collection.convert.ImplicitConversions._
@@ -62,7 +63,7 @@ case class PaimonDynamicPartitionOverwriteCommand(
WriteIntoPaimonTable(
fileStoreTable,
DynamicOverWrite,
- Dataset.ofRows(sparkSession, query),
+ createDataset(sparkSession, query),
Options.fromMap(fileStoreTable.options() ++ writeOptions)
).run(sparkSession)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
new file mode 100644
index 000000000..132d2da65
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.BatchWriteBuilder
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+
+case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec:
TablePartitionSpec)
+ extends LeafRunnableCommand
+ with PaimonCommand {
+
+ override def table: FileStoreTable =
v2Table.getTable.asInstanceOf[FileStoreTable]
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val commit = table.store.newCommit(UUID.randomUUID.toString)
+
+ if (partitionSpec.isEmpty) {
+ commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+ } else {
+ commit.dropPartitions(
+ Collections.singletonList(partitionSpec.asJava),
+ BatchWriteBuilder.COMMIT_IDENTIFIER
+ )
+ }
+
+ Seq.empty[Row]
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
similarity index 87%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdatePaimonTableCommand.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index c9ec48a16..a44dd1a36 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -15,19 +15,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.plans.logical
+package org.apache.paimon.spark.commands
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.RowKind
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.Utils.createDataset
import org.apache.spark.sql.catalyst.analysis.{AssignmentAlignmentHelper,
EliminateSubqueryAliases}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If}
+import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project,
UpdateTable}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit
@@ -47,8 +48,7 @@ case class UpdatePaimonTableCommand(u: UpdateTable)
val updatedPlan = Project(updatedExprs,
Filter(u.condition.getOrElse(TrueLiteral), relation))
- val df = Dataset
- .ofRows(sparkSession, updatedPlan)
+ val df = createDataset(sparkSession, updatedPlan)
.withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue))
WriteIntoPaimonTable(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index 93278ddb2..bf84b8955 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.extensions
import org.apache.spark.sql.SparkSessionExtensions
-import org.apache.spark.sql.catalyst.analysis.{CoerceArguments,
PaimonAnalysis, PaimonMergeInto, ResolveProcedures, RewriteRowLevelCommands}
+import org.apache.spark.sql.catalyst.analysis.{CoerceArguments,
PaimonAnalysis, PaimonDeleteTable, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonUpdateTable, ResolveProcedures}
import
org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser
import org.apache.spark.sql.catalyst.plans.logical.PaimonTableValuedFunctions
import
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
@@ -35,7 +35,10 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectResolutionRule(spark => ResolveProcedures(spark))
extensions.injectResolutionRule(_ => CoerceArguments)
- extensions.injectPostHocResolutionRule(_ => RewriteRowLevelCommands)
+ extensions.injectPostHocResolutionRule(spark =>
PaimonPostHocResolutionRules(spark))
+
+ extensions.injectPostHocResolutionRule(_ => PaimonUpdateTable)
+ extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable)
extensions.injectPostHocResolutionRule(spark => PaimonMergeInto(spark))
// table function extensions
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
index e371a18ed..feb8815f9 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala
@@ -17,7 +17,10 @@
*/
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
/**
* Some classes or methods defined in the spark project are marked as private
under
@@ -45,4 +48,14 @@ object Utils {
def createDataset(sparkSession: SparkSession, logicalPlan: LogicalPlan):
Dataset[Row] = {
Dataset.ofRows(sparkSession, logicalPlan)
}
+
+ def normalizeExprs(exprs: Seq[Expression], attributes: Seq[Attribute]):
Seq[Expression] = {
+ DataSourceStrategy.normalizeExprs(exprs, attributes)
+ }
+
+ def translateFilter(
+ predicate: Expression,
+ supportNestedPredicatePushdown: Boolean): Option[Filter] = {
+ DataSourceStrategy.translateFilter(predicate,
supportNestedPredicatePushdown)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
index 6498fb5ae..caca14c73 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonAnalysis.scala
@@ -18,14 +18,16 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.paimon.spark.SparkTable
+import
org.apache.paimon.spark.commands.{PaimonDynamicPartitionOverwriteCommand,
PaimonTruncateTableCommand}
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, OverwritePartitionsDynamic,
PaimonDynamicPartitionOverwriteCommand, PaimonTableValuedFunctions,
PaimonTableValueFunction}
+import org.apache.spark.sql.catalyst.analysis.PaimonRelation.isPaimonTable
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
MergeIntoTable, OverwritePartitionsDynamic, PaimonTableValuedFunctions,
PaimonTableValueFunction, TruncateTable}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] with
AnalysisHelper {
+class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsDown {
@@ -41,6 +43,18 @@ class PaimonAnalysis(session: SparkSession) extends
Rule[LogicalPlan] with Analy
}
+case class PaimonPostHocResolutionRules(session: SparkSession) extends
Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case t @ TruncateTable(PaimonRelation(table)) if t.resolved =>
+ PaimonTruncateTableCommand(table, Map.empty)
+
+ case _ => plan
+ }
+ }
+}
+
object PaimonDynamicPartitionOverwrite {
def unapply(o: OverwritePartitionsDynamic): Option[(DataSourceV2Relation,
FileStoreTable)] = {
if (o.query.resolved) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonDeleteTable.scala
similarity index 56%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonDeleteTable.scala
index 620ab814a..7b2175ec7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonDeleteTable.scala
@@ -15,23 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.catalyst.analysis
-import org.apache.paimon.CoreOptions.MergeEngine
+import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
-sealed trait RowLevelOp {
- val supportedMergeEngine: Seq[MergeEngine]
-}
+import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable,
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
-case object Delete extends RowLevelOp {
- override def toString: String = "delete"
+object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
- override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE)
-}
+ override val operation: RowLevelOp = Delete
-case object Update extends RowLevelOp {
- override def toString: String = "update"
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperators {
+ case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved
=>
+ checkPaimonTable(table)
- override val supportedMergeEngine: Seq[MergeEngine] =
- Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+ DeleteFromPaimonTableCommand(table, d)
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
index 7fa050398..4908fa7ec 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeInto.scala
@@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.commands.MergeIntoPaimonTable
-import org.apache.paimon.table.{FileStoreTable, PrimaryKeyFileStoreTable}
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.PaimonRelation
import org.apache.spark.sql.catalyst.analysis.expressions.ExpressionHelper
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
AttributeSet, Expression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, InsertStarAction, LogicalPlan, MergeAction, MergeIntoTable,
UpdateAction, UpdateStarAction}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -33,26 +33,22 @@ import scala.collection.mutable
/** A post-hoc resolution rule for MergeInto. */
case class PaimonMergeInto(spark: SparkSession)
extends Rule[LogicalPlan]
- with AnalysisHelper
+ with RowLevelHelper
with ExpressionHelper {
+ override val operation: RowLevelOp = MergeInto
+
private val resolver: Resolver = spark.sessionState.conf.resolver
def apply(plan: LogicalPlan): LogicalPlan = {
- plan match {
- case merge: MergeIntoTable =>
- val relation = getPaimonTableRelation(merge.targetTable)
+ plan.resolveOperators {
+ case merge: MergeIntoTable
+ if merge.resolved && PaimonRelation.isPaimonTable(merge.targetTable)
=>
+ val relation = PaimonRelation.getPaimonRelation(merge.targetTable)
val v2Table = relation.table.asInstanceOf[SparkTable]
val targetOutput = relation.output
- v2Table.getTable match {
- case _: PrimaryKeyFileStoreTable =>
- case _: FileStoreTable =>
- throw new RuntimeException("Only support to merge into table with
primary keys.")
- case _ =>
- throw new RuntimeException("Can't merge into a non-file store
table.")
- }
-
+ checkPaimonTable(v2Table)
checkCondition(merge.mergeCondition)
merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
@@ -60,8 +56,7 @@ case class PaimonMergeInto(spark: SparkSession)
val updateActions = merge.matchedActions.collect { case a:
UpdateAction => a }
val primaryKeys =
v2Table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
checkUpdateActionValidity(
- merge.targetTable,
- merge.sourceTable,
+ AttributeSet(targetOutput),
merge.mergeCondition,
updateActions,
primaryKeys)
@@ -78,8 +73,6 @@ case class PaimonMergeInto(spark: SparkSession)
merge.mergeCondition,
alignedMatchedActions,
alignedNotMatchedActions)
-
- case _ => plan
}
}
@@ -171,54 +164,21 @@ case class PaimonMergeInto(spark: SparkSession)
/** This check will avoid to update the primary key columns */
private def checkUpdateActionValidity(
- target: LogicalPlan,
- source: LogicalPlan,
+ targetOutput: AttributeSet,
mergeCondition: Expression,
actions: Seq[UpdateAction],
primaryKeys: Seq[String]): Unit = {
- val targetOutput = target.outputSet
- val sourceOutput = source.outputSet
-
- // Check whether this attribute is same to primary key and is from target
table.
- def isTargetPrimaryKey(attr: AttributeReference, primaryKey: String):
Boolean = {
- resolver(primaryKey, attr.name) && targetOutput.contains(attr)
- }
-
- // Check whether there is an `EqualTo` expression related to primary key
between source and target.
- def existsPrimaryKeyEqualToExpression(
- expressions: Seq[Expression],
- primaryKey: String): Boolean = {
- expressions.exists {
- case EqualTo(left: AttributeReference, right: AttributeReference) =>
- if (isTargetPrimaryKey(left, primaryKey)) {
- sourceOutput.contains(right)
- } else if (isTargetPrimaryKey(right, primaryKey)) {
- targetOutput.contains(left)
- } else {
- false
- }
- case _ => false
- }
- }
-
// Check whether there are enough `EqualTo` expressions related to all the
primary keys.
lazy val isMergeConditionValid = {
val mergeExpressions = splitConjunctivePredicates(mergeCondition)
primaryKeys.forall {
- primaryKey => existsPrimaryKeyEqualToExpression(mergeExpressions,
primaryKey)
+ primaryKey => isUpdateExpressionToPrimaryKey(targetOutput,
mergeExpressions, primaryKey)
}
}
- // Check whether there are on `EqualTo` expression related to any primary
key.
- // Then, we do not update primary key columns.
+ // Check whether there are an update expression related to any primary key.
def isUpdateActionValid(action: UpdateAction): Boolean = {
- val found = primaryKeys.find {
- primaryKey =>
- existsPrimaryKeyEqualToExpression(
- action.assignments.map(a => EqualTo(a.key, a.value)),
- primaryKey)
- }
- found.isEmpty
+ validUpdateAssignment(targetOutput, primaryKeys, action.assignments)
}
val valid = isMergeConditionValid || actions.forall(isUpdateActionValid)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
index 5331a9c40..366146c7a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonMergeIntoResolver.scala
@@ -22,7 +22,7 @@ import
org.apache.spark.sql.catalyst.analysis.expressions.ExpressionHelper
import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction,
InsertAction, InsertStarAction, LogicalPlan, MergeAction, MergeIntoTable,
UpdateAction, UpdateStarAction}
/** Resolve all the expressions for MergeInto. */
-object PaimonMergeIntoResolver extends AnalysisHelper with ExpressionHelper {
+object PaimonMergeIntoResolver extends ExpressionHelper {
def apply(merge: MergeIntoTable, spark: SparkSession): LogicalPlan = {
val target = merge.targetTable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnalysisHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonRelation.scala
similarity index 78%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnalysisHelper.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonRelation.scala
index 2ac7c3734..040a1aac5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/AnalysisHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonRelation.scala
@@ -26,14 +26,18 @@ import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import scala.util.control.NonFatal
/** An analysis helper */
-trait AnalysisHelper extends Logging {
+object PaimonRelation extends Logging {
+
+ def unapply(plan: LogicalPlan): Option[SparkTable] =
+ EliminateSubqueryAliases(plan) match {
+ case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
+ case ResolvedTable(_, _, table: SparkTable, _) => Some(table)
+ case _ => None
+ }
def isPaimonTable(plan: LogicalPlan): Boolean = {
try {
- EliminateSubqueryAliases(plan) match {
- case DataSourceV2Relation(_: SparkTable, _, _, _, _) => true
- case _ => false
- }
+ PaimonRelation.unapply(plan).nonEmpty
} catch {
case NonFatal(e) =>
logWarning("Can't check if this plan is a paimon table", e)
@@ -41,7 +45,7 @@ trait AnalysisHelper extends Logging {
}
}
- def getPaimonTableRelation(plan: LogicalPlan): DataSourceV2Relation = {
+ def getPaimonRelation(plan: LogicalPlan): DataSourceV2Relation = {
EliminateSubqueryAliases(plan) match {
case d @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => d
case _ => throw new RuntimeException(s"It's not a paimon table, $plan")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonUpdateTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonUpdateTable.scala
new file mode 100644
index 000000000..df4bcf59a
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/PaimonUpdateTable.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.commands.UpdatePaimonTableCommand
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UpdateTable}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+object PaimonUpdateTable extends Rule[LogicalPlan] with RowLevelHelper {
+
+ override val operation: RowLevelOp = Update
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.resolveOperators {
+ case u @ UpdateTable(PaimonRelation(table), assignments, _) if
u.resolved =>
+ checkPaimonTable(table)
+
+ val primaryKeys =
table.properties().get(CoreOptions.PRIMARY_KEY.key).split(",")
+ if (!validUpdateAssignment(u.table.outputSet, primaryKeys,
assignments)) {
+ throw new RuntimeException("Can't update the primary key column.")
+ }
+
+ UpdatePaimonTableCommand(u)
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommands.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommands.scala
deleted file mode 100644
index b5cf04c49..000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelCommands.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.catalyst.analysis
-
-import org.apache.paimon.CoreOptions.MERGE_ENGINE
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.table.Table
-
-import org.apache.spark.sql.{AnalysisException, Delete, RowLevelOp, Update}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
PredicateHelper, SubqueryExpression}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.SupportsDelete
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-
-import java.util
-
-object RewriteRowLevelCommands extends Rule[LogicalPlan] with PredicateHelper {
-
- override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
- case d @ DeleteFromTable(ResolvesToPaimonTable(table), condition) =>
- validateRowLevelOp(Delete, table.getTable, Option.empty)
- if (canDeleteWhere(d, table, condition)) {
- d
- } else {
- DeleteFromPaimonTableCommand(d)
- }
-
- case u @ UpdateTable(ResolvesToPaimonTable(table), assignments, _) =>
- validateRowLevelOp(Update, table.getTable, Option.apply(assignments))
- UpdatePaimonTableCommand(u)
- }
-
- private object ResolvesToPaimonTable {
- def unapply(plan: LogicalPlan): Option[SparkTable] =
- EliminateSubqueryAliases(plan) match {
- case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
- case _ => None
- }
- }
-
- private def validateRowLevelOp(
- op: RowLevelOp,
- table: Table,
- assignments: Option[Seq[Assignment]]): Unit = {
- val options = Options.fromMap(table.options)
- val primaryKeys = table.primaryKeys()
- if (primaryKeys.isEmpty) {
- throw new UnsupportedOperationException(
- s"table ${table.getClass.getName} can not support $op, because there
is no primary key.")
- }
-
- if (op.equals(Update) && isPrimaryKeyUpdate(primaryKeys, assignments.get))
{
- throw new UnsupportedOperationException(s"$op to primary keys is not
supported.")
- }
-
- val mergeEngine = options.get(MERGE_ENGINE)
- if (!op.supportedMergeEngine.contains(mergeEngine)) {
- throw new UnsupportedOperationException(
- s"merge engine $mergeEngine can not support $op, currently only
${op.supportedMergeEngine
- .mkString(", ")} can support $op.")
- }
- }
-
- private def canDeleteWhere(
- d: DeleteFromTable,
- table: SparkTable,
- condition: Expression): Boolean = {
- table match {
- case t: SupportsDelete if !SubqueryExpression.hasSubquery(condition) =>
- // fail if any filter cannot be converted.
- // correctness depends on removing all matching data.
- val filters = DataSourceStrategy
- .normalizeExprs(Seq(condition), d.output)
- .flatMap(splitConjunctivePredicates(_).map {
- f =>
- DataSourceStrategy
- .translateFilter(f, supportNestedPredicatePushdown = true)
- .getOrElse(throw new AnalysisException(s"Exec update failed:" +
- s" cannot translate expression to source filter: $f"))
- })
- .toArray
- t.canDeleteWhere(filters)
- case _ => false
- }
- }
-
- private def isPrimaryKeyUpdate(
- primaryKeys: util.List[String],
- assignments: Seq[Assignment]): Boolean = {
- assignments.exists(
- a => {
- a.key match {
- case attr: Attribute => primaryKeys.contains(attr.name)
- case _ => false
- }
- })
- }
-
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelHelper.scala
new file mode 100644
index 000000000..2f5b6574c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelHelper.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.paimon.CoreOptions.MERGE_ENGINE
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.{FileStoreTable, PrimaryKeyFileStoreTable,
Table}
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, AttributeSet, BinaryExpression, EqualTo, Expression,
SubqueryExpression}
+import org.apache.spark.sql.catalyst.plans.logical.Assignment
+
+import scala.collection.JavaConverters._
+
+trait RowLevelHelper extends SQLConfHelper {
+
+ val operation: RowLevelOp
+
+ protected def checkPaimonTable(table: SparkTable): Unit = {
+ val paimonTable = table.getTable match {
+ case pkTable: PrimaryKeyFileStoreTable => pkTable
+ case _: FileStoreTable =>
+ throw new UnsupportedOperationException(
+ s"Only support to $operation table with primary keys.")
+ case _ =>
+ throw new UnsupportedOperationException(s"Can't $operation a non-file
store table.")
+ }
+
+ val options = Options.fromMap(paimonTable.options)
+ val mergeEngine = options.get(MERGE_ENGINE)
+ if (!operation.supportedMergeEngine.contains(mergeEngine)) {
+ throw new UnsupportedOperationException(
+ s"merge engine $mergeEngine can not support $operation, currently only
${operation.supportedMergeEngine
+ .mkString(", ")} can support $operation.")
+ }
+ }
+
+ protected def checkSubquery(condition: Expression): Unit = {
+ if (SubqueryExpression.hasSubquery(condition)) {
+ throw new RuntimeException(
+ s"Subqueries are not supported in $condition operation (condition =
$condition).")
+ }
+ }
+
+ protected def validUpdateAssignment(
+ output: AttributeSet,
+ primaryKeys: Seq[String],
+ assignments: Seq[Assignment]): Boolean = {
+ !primaryKeys.exists {
+ primaryKey => isUpdateExpressionToPrimaryKey(output, assignments,
primaryKey)
+ }
+ }
+
+ // Check whether there is an update expression related to primary key.
+ protected def isUpdateExpressionToPrimaryKey(
+ output: AttributeSet,
+ expressions: Seq[Expression],
+ primaryKey: String): Boolean = {
+ val resolver = conf.resolver
+
+ // Check whether this attribute is same to primary key and is from target
table.
+ def isTargetPrimaryKey(attr: AttributeReference): Boolean = {
+ resolver(primaryKey, attr.name) && output.contains(attr)
+ }
+
+ expressions
+ .filterNot {
+ case BinaryExpression(left, right) => left == right
+ case Assignment(key, value) => key == value
+ }
+ .exists {
+ case EqualTo(left: AttributeReference, _) =>
+ isTargetPrimaryKey(left)
+ case Assignment(key: AttributeReference, _) =>
+ isTargetPrimaryKey(key)
+ case _ => false
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelOp.scala
similarity index 83%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelOp.scala
index 620ab814a..0c5179b34 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/RowLevelOp.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/RowLevelOp.scala
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.catalyst.analysis
import org.apache.paimon.CoreOptions.MergeEngine
@@ -35,3 +35,10 @@ case object Update extends RowLevelOp {
override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
}
+
+case object MergeInto extends RowLevelOp {
+ override def toString: String = "merge into"
+
+ override val supportedMergeEngine: Seq[MergeEngine] =
+ Seq(MergeEngine.DEDUPLICATE, MergeEngine.PARTIAL_UPDATE)
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
deleted file mode 100644
index 92833cf1a..000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.catalyst.plans.logical
-
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.{InsertInto, SparkTable}
-import org.apache.paimon.spark.commands.WriteIntoPaimonTable
-import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.types.RowKind
-
-import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.functions.lit
-
-case class DeleteFromPaimonTableCommand(d: DeleteFromTable) extends
LeafRunnableCommand {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val relation =
EliminateSubqueryAliases(d.table).asInstanceOf[DataSourceV2Relation]
- val condition = d.condition
-
- val filteredPlan = if (condition != null) {
- Filter(condition, relation)
- } else {
- relation
- }
-
- val df = Dataset
- .ofRows(sparkSession, filteredPlan)
- .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
-
- WriteIntoPaimonTable(
-
relation.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable],
- InsertInto,
- df,
- Options.fromMap(relation.options)).run(sparkSession)
-
- Seq.empty[Row]
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 7623e08ab..e121027b2 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.PaimonSparkTestBase
-import org.apache.spark.sql.Delete
+import org.apache.spark.sql.catalyst.analysis.Delete
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
class DeleteFromTableTest extends PaimonSparkTestBase {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
index 52f83d757..603262d58 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.PaimonSparkTestBase
-import org.apache.spark.sql.Update
+import org.apache.spark.sql.catalyst.analysis.Update
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
class UpdateTableTest extends PaimonSparkTestBase {
@@ -33,7 +33,7 @@ class UpdateTableTest extends PaimonSparkTestBase {
spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
assertThatThrownBy(() => spark.sql("UPDATE T SET name = 'a_new' WHERE id =
1"))
- .hasMessageContaining("can not support update, because there is no
primary key")
+ .hasMessageContaining("Only support to update table with primary keys.")
}
CoreOptions.MergeEngine.values().foreach {
@@ -72,7 +72,7 @@ class UpdateTableTest extends PaimonSparkTestBase {
spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c',
'33')")
assertThatThrownBy(() => spark.sql("UPDATE T SET id = 11 WHERE name =
'a'"))
- .hasMessageContaining("update to primary keys is not supported")
+ .hasMessageContaining("Can't update the primary key column.")
}
test(s"test update with no where") {