This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d0b66f272c [core] Extract a PosponeUtils to getKnownNumBuckets
d0b66f272c is described below

commit d0b66f272ca5cdfb4f8f588b37b2461f1c34e06c
Author: JingsongLi <[email protected]>
AuthorDate: Fri Oct 31 10:16:12 2025 +0800

    [core] Extract a PosponeUtils to getKnownNumBuckets
---
 .../org/apache/paimon/table/PostponeUtils.java     | 52 ++++++++++++++++++++++
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 27 +----------
 .../sink/StatelessRowDataStoreWriteOperator.java   |  2 +-
 3 files changed, 55 insertions(+), 26 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
new file mode 100644
index 0000000000..280dd16f00
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/PostponeUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.SimpleFileEntry;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utils for postpone table. */
+public class PostponeUtils {
+
+    public static Map<BinaryRow, Integer> getKnownNumBuckets(FileStoreTable 
table) {
+        Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
+        List<SimpleFileEntry> simpleFileEntries =
+                
table.store().newScan().onlyReadRealBuckets().readSimpleEntries();
+        for (SimpleFileEntry entry : simpleFileEntries) {
+            if (entry.totalBuckets() >= 0) {
+                Integer oldTotalBuckets =
+                        knownNumBuckets.put(entry.partition(), 
entry.totalBuckets());
+                if (oldTotalBuckets != null && oldTotalBuckets != 
entry.totalBuckets()) {
+                    throw new IllegalStateException(
+                            "Partition "
+                                    + entry.partition()
+                                    + " has different totalBuckets "
+                                    + oldTotalBuckets
+                                    + " and "
+                                    + entry.totalBuckets());
+                }
+            }
+        }
+        return knownNumBuckets;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 0d50178dd5..b7a3fc3ca1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -29,9 +29,9 @@ import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
 import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
-import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PostponeUtils;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.ChannelComputer;
 
@@ -308,8 +308,7 @@ public class FlinkSinkBuilder {
             PostponeBucketSink sink = new PostponeBucketSink(table, 
overwritePartition);
             return sink.sinkFrom(partitioned);
         } else {
-            Map<BinaryRow, Integer> knownNumBuckets = getKnownNumBuckets();
-
+            Map<BinaryRow, Integer> knownNumBuckets = 
PostponeUtils.getKnownNumBuckets(table);
             DataStream<InternalRow> partitioned =
                     partition(
                             input,
@@ -329,28 +328,6 @@ public class FlinkSinkBuilder {
         }
     }
 
-    private Map<BinaryRow, Integer> getKnownNumBuckets() {
-        Map<BinaryRow, Integer> knownNumBuckets = new HashMap<>();
-        List<SimpleFileEntry> simpleFileEntries =
-                
table.store().newScan().onlyReadRealBuckets().readSimpleEntries();
-        for (SimpleFileEntry entry : simpleFileEntries) {
-            if (entry.totalBuckets() >= 0) {
-                Integer oldTotalBuckets =
-                        knownNumBuckets.put(entry.partition(), 
entry.totalBuckets());
-                if (oldTotalBuckets != null && oldTotalBuckets != 
entry.totalBuckets()) {
-                    throw new IllegalStateException(
-                            "Partition "
-                                    + entry.partition()
-                                    + " has different totalBuckets "
-                                    + oldTotalBuckets
-                                    + " and "
-                                    + entry.totalBuckets());
-                }
-            }
-        }
-        return knownNumBuckets;
-    }
-
     private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> 
input) {
         checkArgument(
                 table.primaryKeys().isEmpty(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
index 3c8af5e629..49705ea805 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StatelessRowDataStoreWriteOperator.java
@@ -50,7 +50,7 @@ public class StatelessRowDataStoreWriteOperator extends 
RowDataStoreWriteOperato
     }
 
     @Override
-    protected String getCommitUser(StateInitializationContext context) throws 
Exception {
+    protected String getCommitUser(StateInitializationContext context) {
         // No conflicts will occur in append only unaware bucket writer, so
         // commitUser does not matter.
         return commitUser == null ? initialCommitUser : commitUser;

Reply via email to