This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 98b7598f4 [spak] unify the writer logic of deletion vector (#3441)
98b7598f4 is described below

commit 98b7598f4ac8f3e759c8a4c0acd04d186a5db59c
Author: Yann Byron <[email protected]>
AuthorDate: Fri May 31 10:05:27 2024 +0800

    [spak] unify the writer logic of deletion vector (#3441)
---
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 31 ++++++++--------------
 1 file changed, 11 insertions(+), 20 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index e97d88a0a..cc6a33cf2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -243,27 +243,18 @@ case class PaimonSparkWriter(table: FileStoreTable) {
       serializer.serialize(commitMessage)
     }
 
-    val serializedCommits = fileStore.bucketMode() match {
-      case BucketMode.BUCKET_UNAWARE =>
-        deletionVectors.mapPartitions {
-          iter: Iterator[SparkDeletionVectors] =>
-            val serializer = new CommitMessageSerializer
-            iter.map(commitDeletionVector(_, serializer))
-        }
-      case _ =>
-        deletionVectors
-          .groupByKey(_.partitionAndBucket)
-          .mapGroups {
-            case (_, iter: Iterator[SparkDeletionVectors]) =>
-              val serializer = new CommitMessageSerializer
-              val grouped = iter
-                .reduce(
-                  (sdv1, sdv2) =>
-                    sdv1.copy(dataFileAndDeletionVector =
-                      sdv1.dataFileAndDeletionVector ++ 
sdv2.dataFileAndDeletionVector))
-              commitDeletionVector(grouped, serializer)
+    val serializedCommits = deletionVectors
+      .groupByKey(_.partitionAndBucket)
+      .mapGroups {
+        case (_, iter: Iterator[SparkDeletionVectors]) =>
+          val serializer = new CommitMessageSerializer
+          val grouped = iter.reduce {
+            (sdv1, sdv2) =>
+              sdv1.copy(dataFileAndDeletionVector =
+                sdv1.dataFileAndDeletionVector ++ 
sdv2.dataFileAndDeletionVector)
           }
-    }
+          commitDeletionVector(grouped, serializer)
+      }
     serializedCommits
       .collect()
       .map(deserializeCommitMessage(serializer, _))

Reply via email to