This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aa61c69c2 [core] Refactor GlobalIndexAssigner to introduce 
endBoostrapWithoutEmit (#3978)
aa61c69c2 is described below

commit aa61c69c2f36613ff3a286537030d8b8cd9cd3e8
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 16 18:23:03 2024 +0800

    [core] Refactor GlobalIndexAssigner to introduce endBoostrapWithoutEmit 
(#3978)
---
 .../paimon/crosspartition/GlobalIndexAssigner.java | 58 ++++++++++++++++++----
 1 file changed, 49 insertions(+), 9 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 819937984..060e2428a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -44,6 +44,7 @@ import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.IDMapping;
 import org.apache.paimon.utils.MutableObjectIterator;
@@ -63,6 +64,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
@@ -192,6 +194,15 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
     }
 
     public void endBoostrap(boolean isEndInput) throws Exception {
+        try (CloseableIterator<BinaryRow> iterator = 
endBoostrapWithoutEmit(isEndInput)) {
+            while (iterator.hasNext()) {
+                processInput(iterator.next());
+            }
+        }
+    }
+
+    public CloseableIterator<BinaryRow> endBoostrapWithoutEmit(boolean 
isEndInput)
+            throws Exception {
         bootstrap = false;
         bootstrapRecords.complete();
         boolean isEmpty = true;
@@ -222,8 +233,9 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
         if (isEmpty && isEndInput) {
             // optimization: bulk load mode
             bulkLoadBootstrapRecords();
+            return CloseableIterator.empty();
         } else {
-            loopBootstrapRecords();
+            return bootstrapRecords();
         }
     }
 
@@ -363,16 +375,44 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
         keyIdBuffer.clear();
     }
 
-    /** Loop bootstrap records to get and put RocksDB. */
-    private void loopBootstrapRecords() throws Exception {
-        try (RowBuffer.RowBufferIterator iterator = 
bootstrapRecords.newIterator()) {
-            while (iterator.advanceNext()) {
-                processInput(iterator.getRow());
+    private CloseableIterator<BinaryRow> bootstrapRecords() {
+        RowBuffer.RowBufferIterator iterator = bootstrapRecords.newIterator();
+        return new CloseableIterator<BinaryRow>() {
+
+            boolean hasNext = false;
+            boolean advanced = false;
+
+            private void advanceIfNeeded() {
+                if (!advanced) {
+                    hasNext = iterator.advanceNext();
+                    advanced = true;
+                }
+            }
+
+            @Override
+            public boolean hasNext() {
+                advanceIfNeeded();
+                return hasNext;
+            }
+
+            @Override
+            public BinaryRow next() {
+                advanceIfNeeded();
+                if (!hasNext) {
+                    throw new NoSuchElementException();
+                }
+
+                advanced = false;
+                return iterator.getRow();
             }
-        }
 
-        bootstrapRecords.reset();
-        bootstrapRecords = null;
+            @Override
+            public void close() {
+                iterator.close();
+                bootstrapRecords.reset();
+                bootstrapRecords = null;
+            }
+        };
     }
 
     private void processNewRecord(BinaryRow partition, int partId, BinaryRow 
key, InternalRow value)

Reply via email to