This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 30bf5036d [flink] support multiple writers writing to the same 
partition when using kafka as logSystem in unaware bucket mode. (#4516)
30bf5036d is described below

commit 30bf5036d6117aedce7297dacec23155a7d5778c
Author: liming.1018 <[email protected]>
AuthorDate: Wed Nov 13 17:04:39 2024 +0800

    [flink] support multiple writers writing to the same partition when using 
kafka as logSystem in unaware bucket mode. (#4516)
---
 .../java/org/apache/paimon/manifest/ManifestCommittable.java   |  7 ++++---
 paimon-core/src/test/java/org/apache/paimon/TestFileStore.java |  3 ++-
 .../paimon/manifest/ManifestCommittableSerializerTest.java     |  2 +-
 .../main/java/org/apache/paimon/flink/sink/StoreCommitter.java | 10 +++++++++-
 .../java/org/apache/paimon/flink/sink/StoreMultiCommitter.java | 10 ++++++----
 .../flink/sink/WrappedManifestCommittableSerializerTest.java   |  2 +-
 6 files changed, 23 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
index 61c4619bd..b4abd0e9e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittable.java
@@ -62,13 +62,14 @@ public class ManifestCommittable {
         commitMessages.add(commitMessage);
     }
 
-    public void addLogOffset(int bucket, long offset) {
-        if (logOffsets.containsKey(bucket)) {
+    public void addLogOffset(int bucket, long offset, boolean allowDuplicate) {
+        if (!allowDuplicate && logOffsets.containsKey(bucket)) {
             throw new RuntimeException(
                     String.format(
                             "bucket-%d appears multiple times, which is not 
possible.", bucket));
         }
-        logOffsets.put(bucket, offset);
+        long newOffset = Math.max(logOffsets.getOrDefault(bucket, offset), 
offset);
+        logOffsets.put(bucket, newOffset);
     }
 
     public long identifier() {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 303879337..5218a515a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -222,7 +222,8 @@ public class TestFileStore extends KeyValueFileStore {
                 null,
                 Collections.emptyList(),
                 (commit, committable) -> {
-                    logOffsets.forEach(committable::addLogOffset);
+                    logOffsets.forEach(
+                            (bucket, offset) -> 
committable.addLogOffset(bucket, offset, false));
                     commit.commit(committable, Collections.emptyMap());
                 });
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
index c179a2c0a..8de8309bc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java
@@ -83,7 +83,7 @@ public class ManifestCommittableSerializerTest {
 
         if (!committable.logOffsets().containsKey(bucket)) {
             int offset = ID.incrementAndGet();
-            committable.addLogOffset(bucket, offset);
+            committable.addLogOffset(bucket, offset, false);
             assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index d237f4da5..4908b9931 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.flink.sink.partition.PartitionListeners;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -44,6 +45,7 @@ public class StoreCommitter implements Committer<Committable, 
ManifestCommittabl
     private final TableCommitImpl commit;
     @Nullable private final CommitterMetrics committerMetrics;
     private final PartitionListeners partitionListeners;
+    private final boolean allowLogOffsetDuplicate;
 
     public StoreCommitter(FileStoreTable table, TableCommit commit, Context 
context) {
         this.commit = (TableCommitImpl) commit;
@@ -60,6 +62,7 @@ public class StoreCommitter implements Committer<Committable, 
ManifestCommittabl
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
+        allowLogOffsetDuplicate = table.bucketMode() == 
BucketMode.BUCKET_UNAWARE;
     }
 
     @VisibleForTesting
@@ -94,7 +97,8 @@ public class StoreCommitter implements Committer<Committable, 
ManifestCommittabl
                 case LOG_OFFSET:
                     LogOffsetCommittable offset =
                             (LogOffsetCommittable) 
committable.wrappedCommittable();
-                    manifestCommittable.addLogOffset(offset.bucket(), 
offset.offset());
+                    manifestCommittable.addLogOffset(
+                            offset.bucket(), offset.offset(), 
allowLogOffsetDuplicate);
                     break;
             }
         }
@@ -138,6 +142,10 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
         partitionListeners.close();
     }
 
+    public boolean allowLogOffsetDuplicate() {
+        return allowLogOffsetDuplicate;
+    }
+
     private void calcNumBytesAndRecordsOut(List<ManifestCommittable> 
committables) {
         if (committerMetrics == null) {
             return;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index aeb3e1857..537a98f97 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -92,11 +92,11 @@ public class StoreMultiCommitter
             WrappedManifestCommittable wrappedManifestCommittable,
             List<MultiTableCommittable> committables) {
         for (MultiTableCommittable committable : committables) {
+            Identifier identifier =
+                    Identifier.create(committable.getDatabase(), 
committable.getTable());
             ManifestCommittable manifestCommittable =
                     wrappedManifestCommittable.computeCommittableIfAbsent(
-                            Identifier.create(committable.getDatabase(), 
committable.getTable()),
-                            checkpointId,
-                            watermark);
+                            identifier, checkpointId, watermark);
 
             switch (committable.kind()) {
                 case FILE:
@@ -106,7 +106,9 @@ public class StoreMultiCommitter
                 case LOG_OFFSET:
                     LogOffsetCommittable offset =
                             (LogOffsetCommittable) 
committable.wrappedCommittable();
-                    manifestCommittable.addLogOffset(offset.bucket(), 
offset.offset());
+                    StoreCommitter committer = tableCommitters.get(identifier);
+                    manifestCommittable.addLogOffset(
+                            offset.bucket(), offset.offset(), 
committer.allowLogOffsetDuplicate());
                     break;
             }
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
index 298f3155b..b0aa76f15 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WrappedManifestCommittableSerializerTest.java
@@ -98,7 +98,7 @@ class WrappedManifestCommittableSerializerTest {
 
         if (!committable.logOffsets().containsKey(bucket)) {
             int offset = ID.incrementAndGet();
-            committable.addLogOffset(bucket, offset);
+            committable.addLogOffset(bucket, offset, false);
             assertThat(committable.logOffsets().get(bucket)).isEqualTo(offset);
         }
     }

Reply via email to