This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aa61c69c2 [core] Refactor GlobalIndexAssigner to introduce
endBoostrapWithoutEmit (#3978)
aa61c69c2 is described below
commit aa61c69c2f36613ff3a286537030d8b8cd9cd3e8
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 16 18:23:03 2024 +0800
[core] Refactor GlobalIndexAssigner to introduce endBoostrapWithoutEmit
(#3978)
---
.../paimon/crosspartition/GlobalIndexAssigner.java | 58 ++++++++++++++++++----
1 file changed, 49 insertions(+), 9 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 819937984..060e2428a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -44,6 +44,7 @@ import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.IDMapping;
import org.apache.paimon.utils.MutableObjectIterator;
@@ -63,6 +64,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.BiConsumer;
@@ -192,6 +194,15 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
}
public void endBoostrap(boolean isEndInput) throws Exception {
+ try (CloseableIterator<BinaryRow> iterator =
endBoostrapWithoutEmit(isEndInput)) {
+ while (iterator.hasNext()) {
+ processInput(iterator.next());
+ }
+ }
+ }
+
+ public CloseableIterator<BinaryRow> endBoostrapWithoutEmit(boolean
isEndInput)
+ throws Exception {
bootstrap = false;
bootstrapRecords.complete();
boolean isEmpty = true;
@@ -222,8 +233,9 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
if (isEmpty && isEndInput) {
// optimization: bulk load mode
bulkLoadBootstrapRecords();
+ return CloseableIterator.empty();
} else {
- loopBootstrapRecords();
+ return bootstrapRecords();
}
}
@@ -363,16 +375,44 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
keyIdBuffer.clear();
}
- /** Loop bootstrap records to get and put RocksDB. */
- private void loopBootstrapRecords() throws Exception {
- try (RowBuffer.RowBufferIterator iterator =
bootstrapRecords.newIterator()) {
- while (iterator.advanceNext()) {
- processInput(iterator.getRow());
+ private CloseableIterator<BinaryRow> bootstrapRecords() {
+ RowBuffer.RowBufferIterator iterator = bootstrapRecords.newIterator();
+ return new CloseableIterator<BinaryRow>() {
+
+ boolean hasNext = false;
+ boolean advanced = false;
+
+ private void advanceIfNeeded() {
+ if (!advanced) {
+ hasNext = iterator.advanceNext();
+ advanced = true;
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ advanceIfNeeded();
+ return hasNext;
+ }
+
+ @Override
+ public BinaryRow next() {
+ advanceIfNeeded();
+ if (!hasNext) {
+ throw new NoSuchElementException();
+ }
+
+ advanced = false;
+ return iterator.getRow();
}
- }
- bootstrapRecords.reset();
- bootstrapRecords = null;
+ @Override
+ public void close() {
+ iterator.close();
+ bootstrapRecords.reset();
+ bootstrapRecords = null;
+ }
+ };
}
private void processNewRecord(BinaryRow partition, int partId, BinaryRow
key, InternalRow value)