This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 70959c989 [spark] Enhance spark DELETE (#2183)
70959c989 is described below
commit 70959c989963eecba50669f6a2fac64ab1033631
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Nov 2 13:59:06 2023 +0800
[spark] Enhance spark DELETE (#2183)
---
docs/content/how-to/writing-tables.md | 12 +-
.../java/org/apache/paimon/spark/SparkRow.java | 15 +-
.../java/org/apache/paimon/spark/SparkTable.java | 49 +++++-
.../paimon/spark/commands/PaimonCommand.scala | 2 -
.../spark/commands/WriteIntoPaimonTable.scala | 20 ++-
.../extensions/PaimonSparkSessionExtensions.scala | 6 +-
.../paimon/spark/schema/SparkSystemColumns.scala | 36 ++++
.../apache/paimon/spark/util/SparkRowUtils.scala | 44 +++++
.../optimizer/RewriteRowLeverCommands.scala | 82 +++++++++
.../logical/DeleteFromPaimonTableCommand.scala | 54 ++++++
.../org/apache/paimon/spark/SparkWriteITCase.java | 9 +-
.../paimon/spark/SparkWriteWithKyroITCase.java | 4 +
.../paimon/spark/sql/DeleteFromTableTest.scala | 183 +++++++++++++++++++++
13 files changed, 493 insertions(+), 23 deletions(-)
diff --git a/docs/content/how-to/writing-tables.md
b/docs/content/how-to/writing-tables.md
index 78bb35f9d..14379eb96 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -430,8 +430,18 @@ DELETE FROM MyTable WHERE currency = 'UNKNOWN';
{{< /tab >}}
{{< tab "Spark" >}}
+{{< hint info >}}
+Important table properties setting:
+1. Only primary key tables support this feature.
+2. If the table has primary keys, [MergeEngine]({{< ref
"concepts/primary-key-table#merge-engines" >}}) needs to be [deduplicate]({{<
ref "concepts/primary-key-table#deduplicate" >}}) to support this feature.
+ {{< /hint >}}
-Spark `DELETE` currently supports only a single point execution, for deleting
small amounts of data.
+To enable delete needs these configs below:
+
+```text
+--conf
spark.sql.catalog.spark_catalog=org.apache.paimon.spark.SparkGenericCatalog
+--conf
spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions
+```
```sql
DELETE FROM MyTable WHERE currency = 'UNKNOWN';
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index e4928974b..b46a10fb0 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -54,10 +54,16 @@ public class SparkRow implements InternalRow, Serializable {
private final RowType type;
private final Row row;
+ private final RowKind rowKind;
public SparkRow(RowType type, Row row) {
+ this(type, row, RowKind.INSERT);
+ }
+
+ public SparkRow(RowType type, Row row, RowKind rowkind) {
this.type = type;
this.row = row;
+ this.rowKind = rowkind;
}
@Override
@@ -67,16 +73,13 @@ public class SparkRow implements InternalRow, Serializable {
@Override
public RowKind getRowKind() {
- return RowKind.INSERT;
+ return rowKind;
}
@Override
public void setRowKind(RowKind rowKind) {
- if (rowKind == RowKind.INSERT) {
- return;
- }
-
- throw new UnsupportedOperationException("Can not set row kind for this
row except INSERT.");
+ throw new UnsupportedOperationException(
+ "Spark row does not support modifying rowkind field");
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 674b4fbc2..77090f6ab 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -19,12 +19,16 @@
package org.apache.paimon.spark;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.TableUtils;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.spark.sql.connector.catalog.SupportsDelete;
import org.apache.spark.sql.connector.catalog.SupportsRead;
@@ -36,10 +40,13 @@ import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.sources.AlwaysTrue;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -47,6 +54,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
/** A spark {@link org.apache.spark.sql.connector.catalog.Table} for paimon. */
public class SparkTable
@@ -57,6 +65,7 @@ public class SparkTable
PaimonPartitionManagement {
private final Table table;
+ @Nullable protected Predicate deletePredicate;
public SparkTable(Table table) {
this.table = table;
@@ -110,19 +119,47 @@ public class SparkTable
}
}
- @Override
- public void deleteWhere(Filter[] filters) {
+ public boolean canDeleteWhere(Filter[] filters) {
SparkFilterConverter converter = new
SparkFilterConverter(table.rowType());
List<Predicate> predicates = new ArrayList<>();
for (Filter filter : filters) {
- if ("AlwaysTrue()".equals(filter.toString())) {
+ if (filter.equals(new AlwaysTrue())) {
continue;
}
-
predicates.add(converter.convert(filter));
}
+ deletePredicate = predicates.isEmpty() ? null :
PredicateBuilder.and(predicates);
+ return deletePredicate == null || deleteIsDropPartition();
+ }
+
+ @Override
+ public void deleteWhere(Filter[] filters) {
+ FileStoreCommit commit =
+ ((AbstractFileStoreTable)
table).store().newCommit(UUID.randomUUID().toString());
+ long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
+ if (deletePredicate == null) {
+ commit.purgeTable(identifier);
+ } else if (deleteIsDropPartition()) {
+
commit.dropPartitions(Collections.singletonList(deletePartitions()),
identifier);
+ } else {
+ // can't reach here
+ throw new UnsupportedOperationException();
+ }
+ }
- TableUtils.deleteWhere(table, predicates);
+ private boolean deleteIsDropPartition() {
+ return deletePredicate != null
+ && deletePredicate.visit(new
OnlyPartitionKeyEqualVisitor(table.partitionKeys()));
+ }
+
+ private Map<String, String> deletePartitions() {
+ if (deletePredicate == null) {
+ return null;
+ }
+ OnlyPartitionKeyEqualVisitor visitor =
+ new OnlyPartitionKeyEqualVisitor(table.partitionKeys());
+ deletePredicate.visit(visitor);
+ return visitor.partitions();
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 6e88ec6c8..786afbba9 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -29,8 +29,6 @@ import java.io.IOException
/** Helper trait for all paimon commands. */
trait PaimonCommand extends WithFileStoreTable {
- val BUCKET_COL = "_bucket_"
-
lazy val bucketMode: BucketMode = table match {
case fileStoreTable: FileStoreTable =>
fileStoreTable.bucketMode
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 35167c6af..24af34062 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -23,7 +23,9 @@ import org.apache.paimon.index.PartitionIndex
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite,
SaveMode, SparkConnectorOptions, SparkRow}
import org.apache.paimon.spark.SparkUtils.createIOManager
-import org.apache.paimon.spark.util.EncoderUtils
+import org.apache.paimon.spark.schema.SparkSystemColumns
+import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
+import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder,
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
import org.apache.paimon.types.RowType
@@ -32,7 +34,6 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.encoders.RowEncoder.encoderFor
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.functions._
@@ -59,13 +60,14 @@ case class WriteIntoPaimonTable(
private lazy val mergeSchema =
options.get(SparkConnectorOptions.MERGE_SCHEMA)
- /** \1. 2. */
override def run(sparkSession: SparkSession): Seq[Row] = {
import sparkSession.implicits._
+ val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)
+
if (mergeSchema) {
val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
- mergeAndCommitSchema(data.schema, allowExplicitCast)
+ mergeAndCommitSchema(dataSchema, allowExplicitCast)
}
val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
@@ -76,9 +78,11 @@ case class WriteIntoPaimonTable(
val primaryKeyCols = tableSchema.trimmedPrimaryKeys().asScala.map(col)
val partitionCols = tableSchema.partitionKeys().asScala.map(col)
- val dataEncoder = EncoderUtils.encode(data.schema).resolveAndBind()
+ val dataEncoder = EncoderUtils.encode(dataSchema).resolveAndBind()
val originFromRow = dataEncoder.createDeserializer()
+ val rowkindColIdx = SparkRowUtils.getFieldIndex(data.schema, ROW_KIND_COL)
+
// append _bucket_ column as placeholder
val withBucketCol = data.withColumn(BUCKET_COL, lit(-1))
val bucketColIdx = withBucketCol.schema.size - 1
@@ -132,7 +136,11 @@ case class WriteIntoPaimonTable(
row =>
val bucket = row.getInt(bucketColIdx)
val bucketColDropped = originFromRow(toRow(row))
- write.write(new DynamicBucketRow(new SparkRow(rowType,
bucketColDropped), bucket))
+ val sparkRow = new SparkRow(
+ rowType,
+ bucketColDropped,
+ SparkRowUtils.getRowKind(row, rowkindColIdx))
+ write.write(new DynamicBucketRow(sparkRow, bucket))
}
val serializer = new CommitMessageSerializer
write.prepareCommit().asScala.map(serializer.serialize).toIterator
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index dae604073..3c9f7a76b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -19,6 +19,7 @@ package org.apache.paimon.spark.extensions
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.{CoerceArguments,
PaimonAnalysis, ResolveProcedures}
+import org.apache.spark.sql.catalyst.optimizer.RewriteRowLeverCommands
import
org.apache.spark.sql.catalyst.parser.extensions.PaimonSparkSqlExtensionsParser
import org.apache.spark.sql.catalyst.plans.logical.PaimonTableValuedFunctions
import
org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy
@@ -31,10 +32,13 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectParser { case (_, parser) => new
PaimonSparkSqlExtensionsParser(parser) }
// analyzer extensions
- // resolution rule extensions
extensions.injectResolutionRule(sparkSession => new
PaimonAnalysis(sparkSession))
extensions.injectResolutionRule(spark => ResolveProcedures(spark))
extensions.injectResolutionRule(_ => CoerceArguments)
+
+ // optimizer extensions
+ extensions.injectOptimizerRule(_ => RewriteRowLeverCommands)
+
// table function extensions
PaimonTableValuedFunctions.supportedFnNames.foreach {
fnName =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/SparkSystemColumns.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/SparkSystemColumns.scala
new file mode 100644
index 000000000..bae79127c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/SparkSystemColumns.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.schema
+
+import org.apache.spark.sql.types.StructType
+
+/** System columns for paimon spark. */
+object SparkSystemColumns {
+
+ // for assigning bucket when writing
+ val BUCKET_COL = "_bucket_"
+
+ // for row lever operation
+ val ROW_KIND_COL = "_row_kind_"
+
+ val SPARK_SYSTEM_COLUMNS_NAME: Seq[String] = Seq(BUCKET_COL, ROW_KIND_COL)
+
+ def filterSparkSystemColumns(schema: StructType): StructType = {
+ StructType(schema.fields.filterNot(field =>
SPARK_SYSTEM_COLUMNS_NAME.contains(field.name)))
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
new file mode 100644
index 000000000..f9ee33c43
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.types.RowKind
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.StructType
+
+object SparkRowUtils {
+
+ def getRowKind(row: Row, rowkindColIdx: Int): RowKind = {
+ if (rowkindColIdx != -1) {
+ RowKind.fromByteValue(row.getByte(rowkindColIdx))
+ } else {
+ RowKind.INSERT
+ }
+ }
+
+ def getFieldIndex(schema: StructType, colName: String): Int = {
+ try {
+ schema.fieldIndex(colName)
+ } catch {
+ case _: IllegalArgumentException =>
+ -1
+ }
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteRowLeverCommands.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteRowLeverCommands.scala
new file mode 100644
index 000000000..807b851e8
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteRowLeverCommands.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.paimon.CoreOptions.{MERGE_ENGINE, MergeEngine}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.table.Table
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper,
SubqueryExpression}
+import
org.apache.spark.sql.catalyst.plans.logical.{DeleteFromPaimonTableCommand,
DeleteFromTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.SupportsDelete
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+object RewriteRowLeverCommands extends Rule[LogicalPlan] with PredicateHelper {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
+ case d @ DeleteFromTable(r: DataSourceV2Relation, condition) =>
+ validateDeletable(r.table.asInstanceOf[SparkTable].getTable)
+ if (canDeleteWhere(r, condition)) {
+ d
+ } else {
+ DeleteFromPaimonTableCommand(r, condition)
+ }
+ }
+
+ private def validateDeletable(table: Table): Boolean = {
+ val options = Options.fromMap(table.options)
+ if (table.primaryKeys().isEmpty) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "table '%s' can not support delete, because there is no primary
key.",
+ table.getClass.getName))
+ }
+ if (!options.get(MERGE_ENGINE).equals(MergeEngine.DEDUPLICATE)) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "merge engine '%s' can not support delete, currently only %s can
support delete.",
+ options.get(MERGE_ENGINE),
+ MergeEngine.DEDUPLICATE))
+ }
+ true
+ }
+
+ private def canDeleteWhere(relation: DataSourceV2Relation, condition:
Expression): Boolean = {
+ relation.table match {
+ case t: SupportsDelete if !SubqueryExpression.hasSubquery(condition) =>
+ // fail if any filter cannot be converted.
+ // correctness depends on removing all matching data.
+ val filters = DataSourceStrategy
+ .normalizeExprs(Seq(condition), relation.output)
+ .flatMap(splitConjunctivePredicates(_).map {
+ f =>
+ DataSourceStrategy
+ .translateFilter(f, supportNestedPredicatePushdown = true)
+ .getOrElse(throw new AnalysisException(s"Exec update failed:" +
+ s" cannot translate expression to source filter: $f"))
+ })
+ .toArray
+ t.canDeleteWhere(filters)
+ case _ => false
+ }
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
new file mode 100644
index 000000000..f380533a8
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/DeleteFromPaimonTableCommand.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{InsertInto, SparkTable}
+import org.apache.paimon.spark.commands.WriteIntoPaimonTable
+import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.RowKind
+
+import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.lit
+
+case class DeleteFromPaimonTableCommand(relation: DataSourceV2Relation,
condition: Expression)
+ extends LeafRunnableCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val filteredPlan = if (condition != null) {
+ Filter(condition, relation)
+ } else {
+ relation
+ }
+ val df = Dataset
+ .ofRows(sparkSession, filteredPlan)
+ .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
+
+ WriteIntoPaimonTable(
+
relation.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable],
+ InsertInto,
+ df,
+ Options.fromMap(relation.options)).run(sparkSession)
+
+ Seq.empty[Row]
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 5de73ba66..0e930c82f 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -43,7 +44,13 @@ public class SparkWriteITCase {
@BeforeAll
public void startMetastoreAndSpark(@TempDir java.nio.file.Path tempDir) {
Path warehousePath = new Path("file:///" + tempDir.toString());
- spark = SparkSession.builder().master("local[2]").getOrCreate();
+ spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config(
+ "spark.sql.extensions",
+ PaimonSparkSessionExtensions.class.getName())
+ .getOrCreate();
spark.conf().set("spark.sql.catalog.paimon",
SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.paimon.warehouse",
warehousePath.toString());
spark.sql("CREATE DATABASE paimon.db");
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
index c008657e5..ae3f92841 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteWithKyroITCase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeAll;
@@ -34,6 +35,9 @@ public class SparkWriteWithKyroITCase extends
SparkWriteITCase {
spark =
SparkSession.builder()
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config(
+ "spark.sql.extensions",
+ PaimonSparkSessionExtensions.class.getName())
.master("local[2]")
.getOrCreate();
spark.conf().set("spark.sql.catalog.paimon",
SparkCatalog.class.getName());
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
new file mode 100644
index 000000000..2008d7775
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+
+class DeleteFromTableTest extends PaimonSparkTestBase {
+
+ test(s"test delete from append only table") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+
+ assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name = 'a'"))
+ .isInstanceOf(classOf[UnsupportedOperationException]);
+ }
+
+ CoreOptions.MergeEngine.values().foreach {
+ mergeEngine =>
+ {
+ test(s"test delete with merge engine $mergeEngine") {
+ val supportUpdateEngines = Seq("deduplicate")
+ val options = if ("first-row".equals(mergeEngine.toString)) {
+ s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine',
'changelog-producer' = 'lookup'"
+ } else {
+ s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine'"
+ }
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |TBLPROPERTIES ($options)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+
+ if (supportUpdateEngines.contains(mergeEngine.toString)) {
+ spark.sql("DELETE FROM T WHERE name = 'a'")
+ } else
+ assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name =
'a'"))
+ .isInstanceOf(classOf[UnsupportedOperationException])
+ }
+ }
+ }
+
+ test(s"test delete with primary key") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' =
'deduplicate')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c',
'33')")
+
+ spark.sql("DELETE FROM T WHERE id = 1")
+
+ val rows1 = spark.sql("SELECT * FROM T").collectAsList()
+ assertThat(rows1.toString).isEqualTo("[[2,b,22], [3,c,33]]")
+
+ spark.sql("DELETE FROM T WHERE id < 3")
+
+ val rows2 = spark.sql("SELECT * FROM T").collectAsList()
+ assertThat(rows2.toString).isEqualTo("[[3,c,33]]")
+ }
+
+ test(s"test delete with non-primary key") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' =
'deduplicate')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c',
'33'), (4, 'a', '44')")
+
+ spark.sql("DELETE FROM T WHERE name = 'a'")
+
+ val rows1 = spark.sql("SELECT * FROM T").collectAsList()
+ assertThat(rows1.toString).isEqualTo("[[2,b,22], [3,c,33]]")
+
+ spark.sql("DELETE FROM T WHERE name < 'c'")
+
+ val rows2 = spark.sql("SELECT * FROM T").collectAsList()
+ assertThat(rows2.toString).isEqualTo("[[3,c,33]]")
+ }
+
+ test(s"test delete with no where") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' =
'deduplicate')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c',
'33')")
+
+ spark.sql("DELETE FROM T")
+
+ val rows = spark.sql("SELECT * FROM T").collectAsList()
+ assertThat(rows.toString).isEqualTo("[]")
+ }
+
+ test(s"test delete with in condition") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' =
'deduplicate')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c',
'33')")
+
+ spark.sql("DELETE FROM T WHERE id IN (1, 2)")
+
+ val rows = spark.sql("SELECT * FROM T").collectAsList()
+ assertThat(rows.toString).isEqualTo("[[3,c,33]]")
+ }
+
+ test(s"test delete with in subquery") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |TBLPROPERTIES ('primary-key' = 'id', 'merge-engine' =
'deduplicate')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22'), (3, 'c',
'33')")
+
+ import testImplicits._
+ val df = Seq(1, 2).toDF("id")
+ df.createOrReplaceTempView("deleted_ids")
+ spark.sql("DELETE FROM T WHERE id IN (SELECT * FROM deleted_ids)")
+
+ val rows = spark.sql("SELECT * FROM T").collectAsList()
+ assertThat(rows.toString).isEqualTo("[[3,c,33]]")
+ }
+
+ test(s"test delete is drop partition") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING, hh STRING)
+ |TBLPROPERTIES ('primary-key' = 'id, dt, hh', 'merge-engine'
= 'deduplicate')
+ |PARTITIONED BY (dt, hh)
+ |""".stripMargin)
+
+ spark.sql(
+ "INSERT INTO T VALUES " +
+ "(1, 'a', '2023-10-01', '12')," +
+ "(2, 'b', '2023-10-01', '12')," +
+ "(3, 'c', '2023-10-02', '12')," +
+ "(4, 'd', '2023-10-02', '13')," +
+ "(5, 'e', '2023-10-02', '14')," +
+ "(6, 'f', '2023-10-02', '15')")
+
+ // delete isn't drop partition
+ spark.sql("DELETE FROM T WHERE name = 'a' and hh = '12'")
+ val rows1 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+ assertThat(rows1.toString).isEqualTo(
+ "[[2,b,2023-10-01,12], [3,c,2023-10-02,12], [4,d,2023-10-02,13],
[5,e,2023-10-02,14], [6,f,2023-10-02,15]]")
+
+ // delete is drop partition
+ spark.sql("DELETE FROM T WHERE hh = '12'")
+ val rows2 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+ assertThat(rows2.toString).isEqualTo(
+ "[[4,d,2023-10-02,13], [5,e,2023-10-02,14], [6,f,2023-10-02,15]]")
+
+ spark.sql("DELETE FROM T WHERE dt = '2023-10-02' and hh = '13'")
+ val rows3 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+ assertThat(rows3.toString).isEqualTo("[[5,e,2023-10-02,14],
[6,f,2023-10-02,15]]")
+
+ spark.sql("DELETE FROM T WHERE dt = '2023-10-02'")
+ val rows4 = spark.sql("SELECT * FROM T ORDER BY id").collectAsList()
+ assertThat(rows4.toString).isEqualTo("[]")
+ }
+}