This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b3fab720d [core] Introduce Index BulkLoading for cross partition 
update (#2154)
b3fab720d is described below

commit b3fab720d2d524fc0255472a3552c637ecbeb09c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 20 11:13:28 2023 +0800

    [core] Introduce Index BulkLoading for cross partition update (#2154)
---
 .../paimon/sort/BinaryInMemorySortBuffer.java      |   4 +-
 .../org/apache/paimon/table/TableTestBase.java     |   2 +-
 .../apache/paimon/flink/lookup/RocksDBState.java   |   6 +-
 .../paimon/flink/lookup/RocksDBStateFactory.java   |  53 +++++++-
 .../paimon/flink/lookup/RocksDBValueState.java     |   4 +-
 .../flink/sink/index/GlobalIndexAssigner.java      | 143 +++++++++++++++++----
 .../sink/index/GlobalIndexAssignerOperator.java    |  15 ++-
 .../flink/GlobalDynamicBucketTableITCase.java      |   1 +
 .../flink/sink/index/GlobalIndexAssignerTest.java  |  69 ++++++----
 9 files changed, 234 insertions(+), 63 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
index 777effe55..2fe3ebdba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/sort/BinaryInMemorySortBuffer.java
@@ -244,7 +244,9 @@ public class BinaryInMemorySortBuffer extends 
BinaryIndexedSortable implements S
 
     @Override
     public final MutableObjectIterator<BinaryRow> sortedIterator() {
-        new QuickSort().sort(this);
+        if (numRecords > 0) {
+            new QuickSort().sort(this);
+        }
         return iterator();
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 0e597b815..5465d5c4b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -71,7 +71,7 @@ public abstract class TableTestBase {
     protected Path warehouse;
     protected Catalog catalog;
     protected String database;
-    @TempDir java.nio.file.Path tempPath;
+    @TempDir public java.nio.file.Path tempPath;
 
     @BeforeEach
     public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
index 784b61301..3632069d7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBState.java
@@ -77,7 +77,11 @@ public abstract class RocksDBState<K, V, CacheV> {
                         .build();
     }
 
-    protected byte[] serializeKey(K key) throws IOException {
+    public ColumnFamilyHandle columnFamily() {
+        return columnFamily;
+    }
+
+    public byte[] serializeKey(K key) throws IOException {
         keyOutView.clear();
         keySerializer.serialize(key, keyOutView);
         return keyOutView.getCopyOfBuffer();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
index 9c930f464..757d08776 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBStateFactory.java
@@ -21,31 +21,41 @@ package org.apache.paimon.flink.lookup;
 import org.apache.paimon.data.serializer.Serializer;
 import org.apache.paimon.flink.RocksDBOptions;
 
+import org.apache.flink.table.runtime.util.KeyValueIterator;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.EnvOptions;
+import org.rocksdb.IngestExternalFileOptions;
 import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileWriter;
 import org.rocksdb.TtlDB;
 
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
 
 /** Factory to create state. */
 public class RocksDBStateFactory implements Closeable {
 
     public static final String MERGE_OPERATOR_NAME = "stringappendtest";
 
-    private RocksDB db;
-
+    private final Options options;
+    private final String path;
     private final ColumnFamilyOptions columnFamilyOptions;
 
+    private RocksDB db;
+    private int sstIndex = 0;
+
     public RocksDBStateFactory(
             String path, org.apache.paimon.options.Options conf, @Nullable 
Duration ttlSecs)
             throws IOException {
@@ -56,11 +66,12 @@ public class RocksDBStateFactory implements Closeable {
                                 .setStatsDumpPeriodSec(0)
                                 .setCreateIfMissing(true),
                         conf);
+        this.path = path;
         this.columnFamilyOptions =
                 RocksDBOptions.createColumnOptions(new ColumnFamilyOptions(), 
conf)
                         .setMergeOperatorName(MERGE_OPERATOR_NAME);
 
-        Options options = new Options(dbOptions, columnFamilyOptions);
+        this.options = new Options(dbOptions, columnFamilyOptions);
         try {
             this.db =
                     ttlSecs == null
@@ -71,6 +82,42 @@ public class RocksDBStateFactory implements Closeable {
         }
     }
 
+    public void bulkLoad(ColumnFamilyHandle columnFamily, 
KeyValueIterator<byte[], byte[]> iterator)
+            throws RocksDBException, IOException {
+        long targetFileSize = options.targetFileSizeBase();
+
+        List<String> files = new ArrayList<>();
+        SstFileWriter writer = null;
+        long recordNum = 0;
+        while (iterator.advanceNext()) {
+            byte[] key = iterator.getKey();
+            byte[] value = iterator.getValue();
+
+            if (writer == null) {
+                writer = new SstFileWriter(new EnvOptions(), options);
+                String path = new File(this.path, "sst-" + 
(sstIndex++)).getPath();
+                writer.open(path);
+                files.add(path);
+            }
+
+            writer.put(key, value);
+            recordNum++;
+            if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
+                writer.finish();
+                writer = null;
+                recordNum = 0;
+            }
+        }
+
+        if (writer != null) {
+            writer.finish();
+        }
+
+        if (files.size() > 0) {
+            db.ingestExternalFile(columnFamily, files, new 
IngestExternalFileOptions());
+        }
+    }
+
     public <K, V> RocksDBValueState<K, V> valueState(
             String name,
             Serializer<K> keySerializer,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
index 077906403..a12855d88 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/RocksDBValueState.java
@@ -88,12 +88,12 @@ public class RocksDBValueState<K, V> extends 
RocksDBState<K, V, RocksDBState.Ref
         }
     }
 
-    private V deserializeValue(byte[] valueBytes) throws IOException {
+    public V deserializeValue(byte[] valueBytes) throws IOException {
         valueInputView.setBuffer(valueBytes);
         return valueSerializer.deserialize(valueInputView);
     }
 
-    private byte[] serializeValue(V value) throws IOException {
+    public byte[] serializeValue(V value) throws IOException {
         valueOutputView.clear();
         valueSerializer.serialize(value, valueOutputView);
         return valueOutputView.getCopyOfBuffer();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
index d35ef6ed4..545b0cbd3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -20,41 +20,63 @@ package org.apache.paimon.flink.sink.index;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.flink.RocksDBOptions;
 import org.apache.paimon.flink.lookup.RocksDBStateFactory;
 import org.apache.paimon.flink.lookup.RocksDBValueState;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.sort.BinaryInMemorySortBuffer;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.PartitionKeyExtractor;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.IDMapping;
+import org.apache.paimon.utils.MutableObjectIterator;
 import org.apache.paimon.utils.PositiveIntInt;
 import org.apache.paimon.utils.PositiveIntIntSerializer;
 import org.apache.paimon.utils.SerBiFunction;
 import org.apache.paimon.utils.SerializableFunction;
 
+import org.apache.flink.table.runtime.util.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.function.BiConsumer;
 
+import static org.apache.paimon.codegen.CodeGenUtils.newNormalizedKeyComputer;
+import static org.apache.paimon.codegen.CodeGenUtils.newRecordComparator;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Assign UPDATE_BEFORE and bucket for the input record, output record with 
bucket. */
-public class GlobalIndexAssigner<T> implements Serializable {
+public class GlobalIndexAssigner<T> implements Serializable, Closeable {
 
     private static final long serialVersionUID = 1L;
 
+    private static final String INDEX_NAME = "keyIndex";
+
     private final AbstractFileStoreTable table;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
@@ -63,6 +85,9 @@ public class GlobalIndexAssigner<T> implements Serializable {
     private final SerBiFunction<T, BinaryRow, T> setPartition;
     private final SerBiFunction<T, RowKind, T> setRowKind;
 
+    private transient boolean bootstrap;
+    private transient BinaryExternalSortBuffer bootstrapBuffer;
+
     private transient int targetBucketRowNumber;
     private transient int assignId;
     private transient BiConsumer<T, Integer> collector;
@@ -92,7 +117,14 @@ public class GlobalIndexAssigner<T> implements Serializable 
{
         this.setRowKind = setRowKind;
     }
 
-    public void open(File tmpDir, int numAssigners, int assignId, 
BiConsumer<T, Integer> collector)
+    // ================== Start Public API ===================
+
+    public void open(
+            IOManager ioManager,
+            File tmpDir,
+            int numAssigners,
+            int assignId,
+            BiConsumer<T, Integer> collector)
             throws Exception {
         this.numAssigners = numAssigners;
         this.assignId = assignId;
@@ -113,7 +145,7 @@ public class GlobalIndexAssigner<T> implements Serializable 
{
         RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
         this.keyIndex =
                 stateFactory.valueState(
-                        "keyIndex",
+                        INDEX_NAME,
                         new RowCompactedSerializer(keyType),
                         new PositiveIntIntSerializer(),
                         options.get(RocksDBOptions.LOOKUP_CACHE_ROWS));
@@ -121,9 +153,81 @@ public class GlobalIndexAssigner<T> implements 
Serializable {
         this.partMapping = new IDMapping<>(BinaryRow::copy);
         this.bucketAssigner = new BucketAssigner();
         this.existsAction = fromMergeEngine(coreOptions.mergeEngine());
+
+        // create bootstrap sort buffer
+        this.bootstrap = true;
+        int pageSize = coreOptions.pageSize();
+        long bufferSize = coreOptions.writeBufferSize() / 2;
+        RecordComparator comparator =
+                newRecordComparator(
+                        Collections.singletonList(DataTypes.BYTES()), 
"binary_comparator");
+        BinaryInMemorySortBuffer sortBuffer =
+                BinaryInMemorySortBuffer.createBuffer(
+                        newNormalizedKeyComputer(
+                                Collections.singletonList(DataTypes.BYTES()),
+                                "binary_normalized_key"),
+                        new InternalRowSerializer(DataTypes.BYTES(), 
DataTypes.BYTES()),
+                        comparator,
+                        new HeapMemorySegmentPool(bufferSize, pageSize));
+        this.bootstrapBuffer =
+                new BinaryExternalSortBuffer(
+                        new BinaryRowSerializer(2),
+                        comparator,
+                        pageSize,
+                        sortBuffer,
+                        ioManager,
+                        coreOptions.localSortMaxNumFileHandles());
     }
 
-    public void process(T value) throws Exception {
+    public void bootstrap(T value) throws IOException {
+        checkArgument(bootstrap);
+        BinaryRow partition = keyPartExtractor.partition(value);
+        BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value);
+        int partId = partMapping.index(partition);
+        int bucket = bootstrapBucketFunction.apply(value);
+        bucketAssigner.bootstrapBucket(partition, bucket);
+        PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket);
+        bootstrapBuffer.write(
+                GenericRow.of(keyIndex.serializeKey(key), 
keyIndex.serializeValue(partAndBucket)));
+    }
+
+    public void endBoostrap() throws IOException, RocksDBException {
+        bootstrap = false;
+        MutableObjectIterator<BinaryRow> iterator = 
bootstrapBuffer.sortedIterator();
+        BinaryRow row = new BinaryRow(2);
+        KeyValueIterator<byte[], byte[]> kvIter =
+                new KeyValueIterator<byte[], byte[]>() {
+
+                    private BinaryRow current;
+
+                    @Override
+                    public boolean advanceNext() {
+                        try {
+                            current = iterator.next(row);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                        return current != null;
+                    }
+
+                    @Override
+                    public byte[] getKey() {
+                        return current.getBinary(0);
+                    }
+
+                    @Override
+                    public byte[] getValue() {
+                        return current.getBinary(1);
+                    }
+                };
+
+        stateFactory.bulkLoad(keyIndex.columnFamily(), kvIter);
+        bootstrapBuffer.clear();
+        bootstrapBuffer = null;
+    }
+
+    public void processInput(T value) throws Exception {
+        checkArgument(!bootstrap);
         BinaryRow partition = extractor.partition(value);
         BinaryRow key = extractor.trimmedPrimaryKey(value);
 
@@ -168,16 +272,20 @@ public class GlobalIndexAssigner<T> implements 
Serializable {
         }
     }
 
-    public void bootstrap(T value) throws IOException {
-        BinaryRow partition = keyPartExtractor.partition(value);
-        BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value);
-        int partId = partMapping.index(partition);
-        int bucket = bootstrapBucketFunction.apply(value);
-        bucketAssigner.bootstrapBucket(partition, bucket);
-        PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket);
-        keyIndex.put(key, partAndBucket);
+    @Override
+    public void close() throws IOException {
+        if (stateFactory != null) {
+            stateFactory.close();
+            stateFactory = null;
+        }
+
+        if (path != null) {
+            FileIOUtils.deleteDirectoryQuietly(path);
+        }
     }
 
+    // ================== End Public API ===================
+
     private void processNewRecord(BinaryRow partition, int partId, BinaryRow 
key, T value)
             throws IOException {
         int bucket = assignBucket(partition);
@@ -201,17 +309,6 @@ public class GlobalIndexAssigner<T> implements 
Serializable {
         collector.accept(value, bucket);
     }
 
-    public void close() throws IOException {
-        if (stateFactory != null) {
-            stateFactory.close();
-            stateFactory = null;
-        }
-
-        if (path != null) {
-            FileIOUtils.deleteDirectoryQuietly(path);
-        }
-    }
-
     private static class BucketAssigner {
 
         private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new 
HashMap<>();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 8e0e08799..c04eb13cf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -71,21 +71,23 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager =
+        org.apache.flink.runtime.io.disk.iomanager.IOManager flinkIoManager =
                 getContainingTask().getEnvironment().getIOManager();
-        File[] tmpDirs = ioManager.getSpillingDirectories();
+        File[] tmpDirs = flinkIoManager.getSpillingDirectories();
         File tmpDir = 
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
+        IOManager ioManager = 
IOManager.create(flinkIoManager.getSpillingDirectoriesPaths());
         assigner.open(
+                ioManager,
                 tmpDir,
                 getRuntimeContext().getNumberOfParallelSubtasks(),
                 getRuntimeContext().getIndexOfThisSubtask(),
                 this::collect);
         Options options = Options.fromMap(table.options());
-        long bufferSize = 
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
+        long bufferSize = 
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes() / 2;
         long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
         bootstrapBuffer =
                 RowBuffer.getBuffer(
-                        
IOManager.create(ioManager.getSpillingDirectoriesPaths()),
+                        ioManager,
                         new HeapMemorySegmentPool(bufferSize, (int) pageSize),
                         new InternalRowSerializer(table.rowType()),
                         true);
@@ -106,7 +108,7 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
                     // is always true
                     bootstrapBuffer.put(toRow.apply(value));
                 } else {
-                    assigner.process(value);
+                    assigner.processInput(value);
                 }
                 break;
         }
@@ -124,10 +126,11 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
 
     private void endBootstrap() throws Exception {
         if (bootstrapBuffer != null) {
+            assigner.endBoostrap();
             bootstrapBuffer.complete();
             try (RowBuffer.RowBufferIterator iterator = 
bootstrapBuffer.newIterator()) {
                 while (iterator.advanceNext()) {
-                    assigner.process(fromRow.apply(iterator.getRow()));
+                    assigner.processInput(fromRow.apply(iterator.getRow()));
                 }
             }
             bootstrapBuffer.reset();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index 794cb312f..7d5d70b48 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -90,6 +90,7 @@ public class GlobalDynamicBucketTableITCase extends 
CatalogITCaseBase {
         sql(
                 "create table large_t (pt int, k int, v int, primary key (k) 
not enforced) partitioned by (pt) with ("
                         + "'bucket'='-1', "
+                        + "'rocksdb.compaction.level.target-file-size-base'='2 
kb', "
                         + "'dynamic-bucket.target-row-num'='10000')");
         sql(
                 "create temporary table src (pt int, k int, v int) with ("
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
index 97503079b..4d12f1e93 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.TableTestBase;
@@ -85,33 +86,43 @@ public class GlobalIndexAssignerTest extends TableTestBase {
         innerTestBucketAssign(true);
     }
 
+    private IOManager ioManager() {
+        return IOManager.create(new File(tempPath.toFile(), "io").getPath());
+    }
+
     private void innerTestBucketAssign(boolean enableTtl) throws Exception {
         GlobalIndexAssigner<InternalRow> assigner =
                 createAssigner(MergeEngine.DEDUPLICATE, enableTtl);
         List<Integer> output = new ArrayList<>();
-        assigner.open(new File(warehouse.getPath()), 2, 0, (row, bucket) -> 
output.add(bucket));
+        assigner.open(
+                ioManager(),
+                new File(warehouse.getPath()),
+                2,
+                0,
+                (row, bucket) -> output.add(bucket));
+        assigner.endBoostrap();
 
         // assign
-        assigner.process(GenericRow.of(1, 1, 1));
-        assigner.process(GenericRow.of(1, 2, 2));
-        assigner.process(GenericRow.of(1, 3, 3));
+        assigner.processInput(GenericRow.of(1, 1, 1));
+        assigner.processInput(GenericRow.of(1, 2, 2));
+        assigner.processInput(GenericRow.of(1, 3, 3));
         assertThat(output).containsExactly(0, 0, 0);
         output.clear();
 
         // full
-        assigner.process(GenericRow.of(1, 4, 4));
+        assigner.processInput(GenericRow.of(1, 4, 4));
         assertThat(output).containsExactly(2);
         output.clear();
 
         // another partition
-        assigner.process(GenericRow.of(2, 5, 5));
+        assigner.processInput(GenericRow.of(2, 5, 5));
         assertThat(output).containsExactly(0);
         output.clear();
 
         // read assigned
-        assigner.process(GenericRow.of(1, 4, 4));
-        assigner.process(GenericRow.of(1, 2, 2));
-        assigner.process(GenericRow.of(1, 3, 3));
+        assigner.processInput(GenericRow.of(1, 4, 4));
+        assigner.processInput(GenericRow.of(1, 2, 2));
+        assigner.processInput(GenericRow.of(1, 3, 3));
         assertThat(output).containsExactly(2, 0, 0);
         output.clear();
 
@@ -123,14 +134,16 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
         GlobalIndexAssigner<InternalRow> assigner = 
createAssigner(MergeEngine.DEDUPLICATE);
         List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
         assigner.open(
+                ioManager(),
                 new File(warehouse.getPath()),
                 2,
                 0,
                 (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+        assigner.endBoostrap();
 
         // change partition
-        assigner.process(GenericRow.of(1, 1, 1));
-        assigner.process(GenericRow.of(2, 1, 2));
+        assigner.processInput(GenericRow.of(1, 1, 1));
+        assigner.processInput(GenericRow.of(2, 1, 2));
         assertThat(output)
                 .containsExactly(
                         new Tuple2<>(GenericRow.of(1, 1, 1), 0),
@@ -139,14 +152,14 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
         output.clear();
 
         // test partition 1 deleted
-        assigner.process(GenericRow.of(1, 2, 2));
-        assigner.process(GenericRow.of(1, 3, 3));
-        assigner.process(GenericRow.of(1, 4, 4));
+        assigner.processInput(GenericRow.of(1, 2, 2));
+        assigner.processInput(GenericRow.of(1, 3, 3));
+        assigner.processInput(GenericRow.of(1, 4, 4));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
         output.clear();
 
         // move from full bucket
-        assigner.process(GenericRow.of(2, 4, 4));
+        assigner.processInput(GenericRow.of(2, 4, 4));
         assertThat(output)
                 .containsExactly(
                         new Tuple2<>(GenericRow.ofKind(RowKind.DELETE, 1, 4, 
4), 0),
@@ -154,7 +167,7 @@ public class GlobalIndexAssignerTest extends TableTestBase {
         output.clear();
 
         // test partition 1 deleted
-        assigner.process(GenericRow.of(1, 5, 5));
+        assigner.processInput(GenericRow.of(1, 5, 5));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0);
         output.clear();
 
@@ -170,14 +183,16 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
         GlobalIndexAssigner<InternalRow> assigner = 
createAssigner(mergeEngine);
         List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
         assigner.open(
+                ioManager(),
                 new File(warehouse.getPath()),
                 2,
                 0,
                 (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+        assigner.endBoostrap();
 
         // change partition
-        assigner.process(GenericRow.of(1, 1, 1));
-        assigner.process(GenericRow.of(2, 1, 2));
+        assigner.processInput(GenericRow.of(1, 1, 1));
+        assigner.processInput(GenericRow.of(2, 1, 2));
         assertThat(output)
                 .containsExactly(
                         new Tuple2<>(GenericRow.of(1, 1, 1), 0),
@@ -185,9 +200,9 @@ public class GlobalIndexAssignerTest extends TableTestBase {
         output.clear();
 
         // test partition 2 no effect
-        assigner.process(GenericRow.of(2, 2, 2));
-        assigner.process(GenericRow.of(2, 3, 3));
-        assigner.process(GenericRow.of(2, 4, 4));
+        assigner.processInput(GenericRow.of(2, 2, 2));
+        assigner.processInput(GenericRow.of(2, 3, 3));
+        assigner.processInput(GenericRow.of(2, 4, 4));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
         output.clear();
         assigner.close();
@@ -198,21 +213,23 @@ public class GlobalIndexAssignerTest extends 
TableTestBase {
         GlobalIndexAssigner<InternalRow> assigner = 
createAssigner(MergeEngine.FIRST_ROW);
         List<Tuple2<InternalRow, Integer>> output = new ArrayList<>();
         assigner.open(
+                ioManager(),
                 new File(warehouse.getPath()),
                 2,
                 0,
                 (row, bucket) -> output.add(new Tuple2<>(row, bucket)));
+        assigner.endBoostrap();
 
         // change partition
-        assigner.process(GenericRow.of(1, 1, 1));
-        assigner.process(GenericRow.of(2, 1, 2));
+        assigner.processInput(GenericRow.of(1, 1, 1));
+        assigner.processInput(GenericRow.of(2, 1, 2));
         assertThat(output).containsExactly(new Tuple2<>(GenericRow.of(1, 1, 
1), 0));
         output.clear();
 
         // test partition 2 no effect
-        assigner.process(GenericRow.of(2, 2, 2));
-        assigner.process(GenericRow.of(2, 3, 3));
-        assigner.process(GenericRow.of(2, 4, 4));
+        assigner.processInput(GenericRow.of(2, 2, 2));
+        assigner.processInput(GenericRow.of(2, 3, 3));
+        assigner.processInput(GenericRow.of(2, 4, 4));
         assertThat(output.stream().map(t -> t.f1)).containsExactly(0, 0, 0);
         output.clear();
         assigner.close();

Reply via email to