This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new db3a6939f [spark] Delete supports all merge engines (#3294)
db3a6939f is described below
commit db3a6939fe155beb7d575f1d84d414dafd2c6f15
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 6 13:43:42 2024 +0800
[spark] Delete supports all merge engines (#3294)
---
.../paimon/spark/procedure/CompactProcedure.java | 8 ++---
.../spark/catalyst/analysis/RowLevelOp.scala | 6 +++-
.../commands/DeleteFromPaimonTableCommand.scala | 22 +++++++------
.../paimon/spark/commands/PaimonCommand.scala | 10 ++++++
.../paimon/spark/commands/PaimonSparkWriter.scala | 8 +++++
.../paimon/spark/sql/DeleteFromTableTest.scala | 36 +++++++++++++---------
6 files changed, 62 insertions(+), 28 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 51fa6bd6e..b4cbcca06 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -41,7 +41,7 @@ import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.SerializationUtils;
@@ -221,13 +221,13 @@ public class CompactProcedure extends BaseProcedure {
private void compactAwareBucketTable(
FileStoreTable table, @Nullable Predicate filter, JavaSparkContext
javaSparkContext) {
- InnerTableScan scan = table.newScan();
+ SnapshotReader snapshotReader = table.newSnapshotReader();
if (filter != null) {
- scan.withFilter(filter);
+ snapshotReader.withFilter(filter);
}
List<Pair<byte[], Integer>> partitionBuckets =
- scan.plan().splits().stream()
+ snapshotReader.read().splits().stream()
.map(split -> (DataSplit) split)
.map(dataSplit -> Pair.of(dataSplit.partition(),
dataSplit.bucket()))
.distinct()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
index 2f5892a02..41881b7b7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/RowLevelOp.scala
@@ -46,7 +46,11 @@ sealed trait RowLevelOp {
case object Delete extends RowLevelOp {
- override val supportedMergeEngine: Seq[MergeEngine] =
Seq(MergeEngine.DEDUPLICATE)
+ override val supportedMergeEngine: Seq[MergeEngine] = Seq(
+ MergeEngine.DEDUPLICATE,
+ MergeEngine.PARTIAL_UPDATE,
+ MergeEngine.AGGREGATE,
+ MergeEngine.FIRST_ROW)
override val supportAppendOnlyTable: Boolean = true
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index cbd365f88..7ce78e79c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -19,9 +19,7 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
-import org.apache.paimon.options.Options
-import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor
+import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
@@ -99,10 +97,10 @@ case class DeleteFromPaimonTableCommand(
writer.commit(Seq.empty)
}
} else {
- val commitMessages = if (withPrimaryKeys) {
- performDeleteForPkTable(sparkSession)
+ val commitMessages = if (usePrimaryKeyDelete()) {
+ performPrimaryKeyDelete(sparkSession)
} else {
- performDeleteForNonPkTable(sparkSession)
+ performDeleteCopyOnWrite(sparkSession)
}
writer.commit(commitMessages)
}
@@ -111,13 +109,17 @@ case class DeleteFromPaimonTableCommand(
Seq.empty[Row]
}
- def performDeleteForPkTable(sparkSession: SparkSession): Seq[CommitMessage]
= {
+ def usePrimaryKeyDelete(): Boolean = {
+ withPrimaryKeys && table.coreOptions().mergeEngine() ==
MergeEngine.DEDUPLICATE
+ }
+
+ def performPrimaryKeyDelete(sparkSession: SparkSession): Seq[CommitMessage]
= {
val df = createDataset(sparkSession, Filter(condition, relation))
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
writer.write(df)
}
- def performDeleteForNonPkTable(sparkSession: SparkSession):
Seq[CommitMessage] = {
+ def performDeleteCopyOnWrite(sparkSession: SparkSession): Seq[CommitMessage]
= {
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition,
relation.output)
val fileNameToMeta = candidateFileMap(candidateDataSplits)
@@ -142,7 +144,9 @@ case class DeleteFromPaimonTableCommand(
PaimonSplitScan(table, touchedDataSplits),
relation.output))
val data = createDataset(sparkSession, toRewriteScanRelation)
- val addCommitMessage = writer.write(data)
+
+ // only write new files, should have no compaction
+ val addCommitMessage = writer.writeOnly().write(data)
// Step5: convert the deleted files that need to be wrote to commit
message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index da91ab1fb..863310e55 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -27,6 +27,7 @@ import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFile
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.Preconditions
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.SparkSession
@@ -95,6 +96,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
protected def findCandidateDataSplits(
condition: Expression,
output: Seq[Attribute]): Seq[DataSplit] = {
+ // low level snapshot reader, it can not be affected by 'scan.mode'
val snapshotReader = table.newSnapshotReader()
if (condition == TrueLiteral) {
val filter =
@@ -112,6 +114,14 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
sparkSession: SparkSession): Array[String] = {
import sparkSession.implicits._
+ // only raw convertible can generate input_file_name()
+ for (split <- candidateDataSplits) {
+ if (!split.rawConvertible()) {
+ throw new IllegalArgumentException(
+ "Only compacted table can generate touched files, please use
'COMPACT' procedure first.");
+ }
+ }
+
val scan = PaimonSplitScan(table, candidateDataSplits.toArray)
val filteredRelation =
FilterLogicalNode(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ab42a4317..db7c4c0c0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -18,6 +18,8 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.WRITE_ONLY
import org.apache.paimon.index.BucketAssigner
import org.apache.paimon.spark.SparkRow
import org.apache.paimon.spark.SparkUtils.createIOManager
@@ -32,6 +34,8 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row,
SparkSession}
import org.apache.spark.sql.functions._
import java.io.IOException
+import java.util.Collections
+import java.util.Collections.singletonMap
import scala.collection.JavaConverters._
@@ -54,6 +58,10 @@ case class PaimonSparkWriter(table: FileStoreTable) {
val writeBuilder: BatchWriteBuilder = table.newBatchWriteBuilder()
+ def writeOnly(): PaimonSparkWriter = {
+ PaimonSparkWriter(table.copy(singletonMap(WRITE_ONLY.key(), "true")))
+ }
+
def write(data: Dataset[_]): Seq[CommitMessage] = {
val sparkSession = data.sparkSession
import sparkSession.implicits._
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 0117c5f95..8c879e213 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -19,9 +19,11 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.MergeEngine
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.analysis.Delete
+import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
abstract class DeleteFromTableTestBase extends PaimonSparkTestBase {
@@ -176,23 +178,30 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
mergeEngine =>
{
test(s"test delete with merge engine $mergeEngine") {
- val options = if ("first-row".equals(mergeEngine.toString)) {
- s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine',
'changelog-producer' = 'lookup'"
- } else {
- s"'primary-key' = 'id', 'merge-engine' = '$mergeEngine'"
- }
+ val otherOptions =
+ if ("first-row".equals(mergeEngine.toString))
"'changelog-producer' = 'lookup'," else ""
spark.sql(s"""
- |CREATE TABLE T (id INT, name STRING, dt STRING)
- |TBLPROPERTIES ($options)
+ |CREATE TABLE T (id INT, name STRING, age INT)
+ |TBLPROPERTIES (
+ | $otherOptions
+ | 'primary-key' = 'id',
+ | 'merge-engine' = '$mergeEngine',
+ | 'write-only' = 'true')
|""".stripMargin)
- spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+ spark.sql("INSERT INTO T VALUES (1, 'a', NULL)")
+ spark.sql("INSERT INTO T VALUES (2, 'b', NULL)")
+ spark.sql("INSERT INTO T VALUES (1, NULL, 16)")
+
+ if (mergeEngine != MergeEngine.DEDUPLICATE) {
+ assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE id = 1"))
+ .hasMessageContaining("please use 'COMPACT' procedure first")
+ spark.sql("CALL sys.compact(table => 'T')")
+ }
- if (Delete.supportedMergeEngine.contains(mergeEngine)) {
- spark.sql("DELETE FROM T WHERE name = 'a'")
- } else
- assertThatThrownBy(() => spark.sql("DELETE FROM T WHERE name =
'a'"))
- .isInstanceOf(classOf[UnsupportedOperationException])
+ spark.sql("DELETE FROM T WHERE id = 1")
+ assertThat(spark.sql("SELECT * FROM T").collectAsList().toString)
+ .isEqualTo("[[2,b,null]]")
}
}
}
@@ -345,7 +354,6 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
spark.sql("DELETE FROM T WHERE hh = '12'")
assertThat(spark.sql("SELECT * FROM `T$audit_log` WHERE
rowkind='-D'").collectAsList().size())
.isEqualTo(3)
-
}
}