This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 98b7598f4 [spak] unify the writer logic of deletion vector (#3441)
98b7598f4 is described below
commit 98b7598f4ac8f3e759c8a4c0acd04d186a5db59c
Author: Yann Byron <[email protected]>
AuthorDate: Fri May 31 10:05:27 2024 +0800
[spak] unify the writer logic of deletion vector (#3441)
---
.../paimon/spark/commands/PaimonSparkWriter.scala | 31 ++++++++--------------
1 file changed, 11 insertions(+), 20 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index e97d88a0a..cc6a33cf2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -243,27 +243,18 @@ case class PaimonSparkWriter(table: FileStoreTable) {
serializer.serialize(commitMessage)
}
- val serializedCommits = fileStore.bucketMode() match {
- case BucketMode.BUCKET_UNAWARE =>
- deletionVectors.mapPartitions {
- iter: Iterator[SparkDeletionVectors] =>
- val serializer = new CommitMessageSerializer
- iter.map(commitDeletionVector(_, serializer))
- }
- case _ =>
- deletionVectors
- .groupByKey(_.partitionAndBucket)
- .mapGroups {
- case (_, iter: Iterator[SparkDeletionVectors]) =>
- val serializer = new CommitMessageSerializer
- val grouped = iter
- .reduce(
- (sdv1, sdv2) =>
- sdv1.copy(dataFileAndDeletionVector =
- sdv1.dataFileAndDeletionVector ++
sdv2.dataFileAndDeletionVector))
- commitDeletionVector(grouped, serializer)
+ val serializedCommits = deletionVectors
+ .groupByKey(_.partitionAndBucket)
+ .mapGroups {
+ case (_, iter: Iterator[SparkDeletionVectors]) =>
+ val serializer = new CommitMessageSerializer
+ val grouped = iter.reduce {
+ (sdv1, sdv2) =>
+ sdv1.copy(dataFileAndDeletionVector =
+ sdv1.dataFileAndDeletionVector ++
sdv2.dataFileAndDeletionVector)
}
- }
+ commitDeletionVector(grouped, serializer)
+ }
serializedCommits
.collect()
.map(deserializeCommitMessage(serializer, _))