This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push: new f9cd6346d [core] Bulk load records for GlobalIndexAssigner when index is empty (#2184) f9cd6346d is described below commit f9cd6346d394a3b3da958f98732107d2800abcaf Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Fri Oct 27 13:44:49 2023 +0800 [core] Bulk load records for GlobalIndexAssigner when index is empty (#2184) --- .../java/org/apache/paimon/disk/IOManager.java | 2 + .../java/org/apache/paimon/disk/IOManagerImpl.java | 9 + .../flink/sink/index/GlobalIndexAssigner.java | 253 +++++++++++++++------ .../sink/index/GlobalIndexAssignerOperator.java | 104 ++------- .../flink/sink/index/GlobalIndexAssignerTest.java | 83 ++++--- 5 files changed, 257 insertions(+), 194 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java index 5b618f99b..8e6b67f83 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManager.java @@ -34,6 +34,8 @@ public interface IOManager extends AutoCloseable { ID createChannel(); + String[] tempDirs(); + Enumerator createChannelEnumerator(); BufferFileWriter createBufferFileWriter(ID channelID) throws IOException; diff --git a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java index cae471aa8..50b8621ec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/disk/IOManagerImpl.java @@ -34,10 +34,13 @@ import java.util.stream.Collectors; /** The facade for the provided I/O manager services. */ public class IOManagerImpl implements IOManager { + protected static final Logger LOG = LoggerFactory.getLogger(IOManager.class); private static final String DIR_NAME_PREFIX = "io"; + private final String[] tempDirs; + private final FileChannelManager fileChannelManager; // ------------------------------------------------------------------------- @@ -50,6 +53,7 @@ public class IOManagerImpl implements IOManager { * @param tempDirs The basic directories for files underlying anonymous channels. */ public IOManagerImpl(String... tempDirs) { + this.tempDirs = tempDirs; this.fileChannelManager = new FileChannelManagerImpl(Preconditions.checkNotNull(tempDirs), DIR_NAME_PREFIX); if (LOG.isInfoEnabled()) { @@ -73,6 +77,11 @@ public class IOManagerImpl implements IOManager { return fileChannelManager.createChannel(); } + @Override + public String[] tempDirs() { + return tempDirs; + } + @Override public Enumerator createChannelEnumerator() { return fileChannelManager.createChannelEnumerator(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java index 6618ddc57..9f168d79e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java @@ -23,17 +23,23 @@ import org.apache.paimon.CoreOptions.MergeEngine; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.JoinedRow; +import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.disk.RowBuffer; import org.apache.paimon.flink.RocksDBOptions; import org.apache.paimon.flink.lookup.RocksDBStateFactory; import org.apache.paimon.flink.lookup.RocksDBValueState; +import org.apache.paimon.flink.utils.ProjectToRowDataFunction; +import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.options.Options; -import org.apache.paimon.schema.TableSchema; import org.apache.paimon.sort.BinaryExternalSortBuffer; import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.PartitionKeyExtractor; +import org.apache.paimon.table.sink.RowPartitionKeyExtractor; +import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; @@ -41,51 +47,52 @@ import org.apache.paimon.utils.FileIOUtils; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.IDMapping; import org.apache.paimon.utils.MutableObjectIterator; +import org.apache.paimon.utils.OffsetRow; import org.apache.paimon.utils.PositiveIntInt; import org.apache.paimon.utils.PositiveIntIntSerializer; -import org.apache.paimon.utils.SerBiFunction; -import org.apache.paimon.utils.SerializableFunction; +import org.apache.paimon.utils.TypeUtils; import org.apache.flink.table.runtime.util.KeyValueIterator; -import org.rocksdb.RocksDBException; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiConsumer; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Assign UPDATE_BEFORE and bucket for the input record, output record with bucket. */ -public class GlobalIndexAssigner<T> implements Serializable, Closeable { +public class GlobalIndexAssigner implements Serializable, Closeable { private static final long serialVersionUID = 1L; private static final String INDEX_NAME = "keyIndex"; private final AbstractFileStoreTable table; - private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction; - private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> - bootstrapExtractorFunction; - private final SerializableFunction<T, Integer> bootstrapBucketFunction; - private final SerBiFunction<T, BinaryRow, T> setPartition; - private final SerBiFunction<T, RowKind, T> setRowKind; + private transient IOManager ioManager; + + private transient int bucketIndex; + private transient ProjectToRowDataFunction setPartition; private transient boolean bootstrap; - private transient BinaryExternalSortBuffer bootstrapBuffer; + private transient BinaryExternalSortBuffer bootstrapKeys; + private transient RowBuffer bootstrapRecords; private transient int targetBucketRowNumber; private transient int assignId; - private transient BiConsumer<T, Integer> collector; + private transient BiConsumer<InternalRow, Integer> collector; private transient int numAssigners; - private transient PartitionKeyExtractor<T> extractor; - private transient PartitionKeyExtractor<T> keyPartExtractor; + private transient PartitionKeyExtractor<InternalRow> extractor; + private transient PartitionKeyExtractor<InternalRow> keyPartExtractor; private transient File path; private transient RocksDBStateFactory stateFactory; private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex; @@ -94,42 +101,39 @@ public class GlobalIndexAssigner<T> implements Serializable, Closeable { private transient BucketAssigner bucketAssigner; private transient ExistsAction existsAction; - public GlobalIndexAssigner( - Table table, - SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction, - SerializableFunction<TableSchema, PartitionKeyExtractor<T>> bootstrapExtractorFunction, - SerializableFunction<T, Integer> bootstrapBucketFunction, - SerBiFunction<T, BinaryRow, T> setPartition, - SerBiFunction<T, RowKind, T> setRowKind) { + public GlobalIndexAssigner(Table table) { this.table = (AbstractFileStoreTable) table; - this.extractorFunction = extractorFunction; - this.bootstrapExtractorFunction = bootstrapExtractorFunction; - this.bootstrapBucketFunction = bootstrapBucketFunction; - this.setPartition = setPartition; - this.setRowKind = setRowKind; } // ================== Start Public API =================== public void open( IOManager ioManager, - File tmpDir, int numAssigners, int assignId, - BiConsumer<T, Integer> collector) + BiConsumer<InternalRow, Integer> collector) throws Exception { + this.ioManager = ioManager; this.numAssigners = numAssigners; this.assignId = assignId; this.collector = collector; + RowType bootstrapType = IndexBootstrap.bootstrapType(table.schema()); + this.bucketIndex = bootstrapType.getFieldCount() - 1; + this.setPartition = new ProjectToRowDataFunction(table.rowType(), table.partitionKeys()); + CoreOptions coreOptions = table.coreOptions(); this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum(); - this.extractor = extractorFunction.apply(table.schema()); - this.keyPartExtractor = bootstrapExtractorFunction.apply(table.schema()); + this.extractor = new RowPartitionKeyExtractor(table.schema()); + this.keyPartExtractor = new KeyPartPartitionKeyExtractor(table.schema()); // state Options options = coreOptions.toConfiguration(); - this.path = new File(tmpDir, "lookup-" + UUID.randomUUID()); + String rocksDBDir = + ioManager + .tempDirs()[ + ThreadLocalRandom.current().nextInt(ioManager.tempDirs().length)]; + this.path = new File(rocksDBDir, "rocksdb-" + UUID.randomUUID()); this.stateFactory = new RocksDBStateFactory( @@ -148,7 +152,7 @@ public class GlobalIndexAssigner<T> implements Serializable, Closeable { // create bootstrap sort buffer this.bootstrap = true; - this.bootstrapBuffer = + this.bootstrapKeys = BinaryExternalSortBuffer.create( ioManager, RowType.of(DataTypes.BYTES()), @@ -156,57 +160,86 @@ public class GlobalIndexAssigner<T> implements Serializable, Closeable { coreOptions.writeBufferSize() / 2, coreOptions.pageSize(), coreOptions.localSortMaxNumFileHandles()); + + this.bootstrapRecords = + RowBuffer.getBuffer( + ioManager, + new HeapMemorySegmentPool( + coreOptions.writeBufferSize() / 2, coreOptions.pageSize()), + new InternalRowSerializer(table.rowType()), + true); } - public void bootstrap(T value) throws IOException { - checkArgument(bootstrap); + public void bootstrapKey(InternalRow value) throws IOException { + checkArgument(inBoostrap()); BinaryRow partition = keyPartExtractor.partition(value); BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value); int partId = partMapping.index(partition); - int bucket = bootstrapBucketFunction.apply(value); + int bucket = value.getInt(bucketIndex); bucketAssigner.bootstrapBucket(partition, bucket); PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket); - bootstrapBuffer.write( + bootstrapKeys.write( GenericRow.of(keyIndex.serializeKey(key), keyIndex.serializeValue(partAndBucket))); } - public void endBoostrap() throws IOException, RocksDBException { + public boolean inBoostrap() { + return bootstrap; + } + + public void endBoostrap(boolean isEndInput) throws Exception { bootstrap = false; - MutableObjectIterator<BinaryRow> iterator = bootstrapBuffer.sortedIterator(); - BinaryRow row = new BinaryRow(2); - KeyValueIterator<byte[], byte[]> kvIter = - new KeyValueIterator<byte[], byte[]>() { - - private BinaryRow current; - - @Override - public boolean advanceNext() { - try { - current = iterator.next(row); - } catch (IOException e) { - throw new UncheckedIOException(e); + bootstrapRecords.complete(); + boolean isEmpty = true; + if (bootstrapKeys.size() > 0) { + MutableObjectIterator<BinaryRow> keyIterator = bootstrapKeys.sortedIterator(); + BinaryRow row = new BinaryRow(2); + KeyValueIterator<byte[], byte[]> kvIter = + new KeyValueIterator<byte[], byte[]>() { + + private BinaryRow current; + + @Override + public boolean advanceNext() { + try { + current = keyIterator.next(row); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return current != null; + } + + @Override + public byte[] getKey() { + return current.getBinary(0); + } + + @Override + public byte[] getValue() { + return current.getBinary(1); } - return current != null; - } - - @Override - public byte[] getKey() { - return current.getBinary(0); - } - - @Override - public byte[] getValue() { - return current.getBinary(1); - } - }; - - stateFactory.bulkLoad(keyIndex.columnFamily(), kvIter); - bootstrapBuffer.clear(); - bootstrapBuffer = null; + }; + + stateFactory.bulkLoad(keyIndex.columnFamily(), kvIter); + isEmpty = false; + } + + bootstrapKeys.clear(); + bootstrapKeys = null; + + if (isEmpty && isEndInput) { + // optimization: bulk load mode + bulkLoadBootstrapRecords(); + } else { + loopBootstrapRecords(); + } } - public void processInput(T value) throws Exception { - checkArgument(!bootstrap); + public void processInput(InternalRow value) throws Exception { + if (inBoostrap()) { + bootstrapRecords.put(value); + return; + } + BinaryRow partition = extractor.partition(value); BinaryRow key = extractor.trimmedPrimaryKey(value); @@ -224,8 +257,8 @@ public class GlobalIndexAssigner<T> implements Serializable, Closeable { { // retract old record BinaryRow previousPart = partMapping.get(previousPartId); - T retract = setPartition.apply(value, previousPart); - retract = setRowKind.apply(retract, RowKind.DELETE); + InternalRow retract = setPartition.apply(value, previousPart); + retract.setRowKind(RowKind.DELETE); collect(retract, previousBucket); bucketAssigner.decrement(previousPart, previousBucket); @@ -236,7 +269,7 @@ public class GlobalIndexAssigner<T> implements Serializable, Closeable { case USE_OLD: { BinaryRow previousPart = partMapping.get(previousPartId); - T newValue = setPartition.apply(value, previousPart); + InternalRow newValue = setPartition.apply(value, previousPart); collect(newValue, previousBucket); break; } @@ -265,7 +298,77 @@ public class GlobalIndexAssigner<T> implements Serializable, Closeable { // ================== End Public API =================== - private void processNewRecord(BinaryRow partition, int partId, BinaryRow key, T value) + /** Sort bootstrap records and assign bucket without RocksDB. */ + private void bulkLoadBootstrapRecords() throws Exception { + RowType rowType = table.rowType(); + List<DataType> fields = + new ArrayList<>(TypeUtils.project(rowType, table.primaryKeys()).getFieldTypes()); + fields.add(DataTypes.INT()); + RowType keyWithIdType = DataTypes.ROW(fields.toArray(new DataType[0])); + + fields.addAll(rowType.getFieldTypes()); + RowType keyWithRowType = DataTypes.ROW(fields.toArray(new DataType[0])); + + // 1. insert into external sort buffer + CoreOptions coreOptions = table.coreOptions(); + BinaryExternalSortBuffer keyIdBuffer = + BinaryExternalSortBuffer.create( + ioManager, + keyWithIdType, + keyWithRowType, + coreOptions.writeBufferSize() / 2, + coreOptions.pageSize(), + coreOptions.localSortMaxNumFileHandles()); + int id = Integer.MAX_VALUE; + GenericRow idRow = new GenericRow(1); + JoinedRow keyAndId = new JoinedRow(); + JoinedRow keyAndRow = new JoinedRow(); + try (RowBuffer.RowBufferIterator iterator = bootstrapRecords.newIterator()) { + while (iterator.advanceNext()) { + BinaryRow row = iterator.getRow(); + BinaryRow key = extractor.trimmedPrimaryKey(row); + idRow.setField(0, id); + keyAndId.replace(key, idRow); + keyAndRow.replace(keyAndId, row); + keyIdBuffer.write(keyAndRow); + id--; + } + } + bootstrapRecords.reset(); + bootstrapRecords = null; + + // 2. loop sorted iterator to assign bucket + MutableObjectIterator<BinaryRow> iterator = keyIdBuffer.sortedIterator(); + BinaryRow keyWithRow = new BinaryRow(keyWithRowType.getFieldCount()); + OffsetRow row = new OffsetRow(rowType.getFieldCount(), keyWithIdType.getFieldCount()); + BinaryRow currentKey = null; + while ((keyWithRow = iterator.next(keyWithRow)) != null) { + row.replace(keyWithRow); + BinaryRow key = extractor.trimmedPrimaryKey(row); + if (currentKey == null || !currentKey.equals(key)) { + // output first record + BinaryRow partition = extractor.partition(row); + collect(row, assignBucket(partition)); + currentKey = key.copy(); + } + } + + keyIdBuffer.clear(); + } + + /** Loop bootstrap records to get and put RocksDB. */ + private void loopBootstrapRecords() throws Exception { + try (RowBuffer.RowBufferIterator iterator = bootstrapRecords.newIterator()) { + while (iterator.advanceNext()) { + processInput(iterator.getRow()); + } + } + + bootstrapRecords.reset(); + bootstrapRecords = null; + } + + private void processNewRecord(BinaryRow partition, int partId, BinaryRow key, InternalRow value) throws IOException { int bucket = assignBucket(partition); keyIndex.put(key, new PositiveIntInt(partId, bucket)); @@ -284,7 +387,7 @@ public class GlobalIndexAssigner<T> implements Serializable, Closeable { return Math.abs(hash % numAssigners); } - private void collect(T value, int bucket) { + private void collect(InternalRow value, int bucket) { collector.accept(value, bucket); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java index 19428155f..8133177a9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java @@ -18,19 +18,9 @@ package org.apache.paimon.flink.sink.index; -import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.data.serializer.InternalRowSerializer; import org.apache.paimon.disk.IOManager; -import org.apache.paimon.disk.RowBuffer; -import org.apache.paimon.flink.utils.ProjectToRowDataFunction; -import org.apache.paimon.memory.HeapMemorySegmentPool; -import org.apache.paimon.options.Options; -import org.apache.paimon.table.AbstractFileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.table.sink.RowPartitionKeyExtractor; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.SerializableFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.StateInitializationContext; @@ -39,33 +29,21 @@ import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import java.io.File; -import java.util.concurrent.ThreadLocalRandom; - /** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */ -public class GlobalIndexAssignerOperator<T> extends AbstractStreamOperator<Tuple2<T, Integer>> - implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, Integer>>, +public class GlobalIndexAssignerOperator + extends AbstractStreamOperator<Tuple2<InternalRow, Integer>> + implements OneInputStreamOperator< + Tuple2<KeyPartOrRow, InternalRow>, Tuple2<InternalRow, Integer>>, BoundedOneInput { private static final long serialVersionUID = 1L; - private final Table table; - private final GlobalIndexAssigner<T> assigner; - private final SerializableFunction<T, InternalRow> toRow; - private final SerializableFunction<InternalRow, T> fromRow; + private final GlobalIndexAssigner assigner; private transient IOManager ioManager; - private transient RowBuffer bootstrapBuffer; - - public GlobalIndexAssignerOperator( - Table table, - GlobalIndexAssigner<T> assigner, - SerializableFunction<T, InternalRow> toRow, - SerializableFunction<InternalRow, T> fromRow) { - this.table = table; + + public GlobalIndexAssignerOperator(GlobalIndexAssigner assigner) { this.assigner = assigner; - this.toRow = toRow; - this.fromRow = fromRow; } @Override @@ -73,72 +51,46 @@ public class GlobalIndexAssignerOperator<T> extends AbstractStreamOperator<Tuple super.initializeState(context); org.apache.flink.runtime.io.disk.iomanager.IOManager flinkIoManager = getContainingTask().getEnvironment().getIOManager(); - File[] tmpDirs = flinkIoManager.getSpillingDirectories(); - File tmpDir = tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)]; ioManager = IOManager.create(flinkIoManager.getSpillingDirectoriesPaths()); assigner.open( ioManager, - tmpDir, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask(), this::collect); - Options options = Options.fromMap(table.options()); - long bufferSize = options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes() / 2; - long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes(); - bootstrapBuffer = - RowBuffer.getBuffer( - ioManager, - new HeapMemorySegmentPool(bufferSize, (int) pageSize), - new InternalRowSerializer(table.rowType()), - true); } @Override - public void processElement(StreamRecord<Tuple2<KeyPartOrRow, T>> streamRecord) + public void processElement(StreamRecord<Tuple2<KeyPartOrRow, InternalRow>> streamRecord) throws Exception { - Tuple2<KeyPartOrRow, T> tuple2 = streamRecord.getValue(); - T value = tuple2.f1; + Tuple2<KeyPartOrRow, InternalRow> tuple2 = streamRecord.getValue(); + InternalRow value = tuple2.f1; switch (tuple2.f0) { case KEY_PART: - assigner.bootstrap(value); + assigner.bootstrapKey(value); break; case ROW: - if (bootstrapBuffer != null) { - // ignore return value, we must enable spillable for bootstrapBuffer, so return - // is always true - bootstrapBuffer.put(toRow.apply(value)); - } else { - assigner.processInput(value); - } + assigner.processInput(value); break; } } @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { - endBootstrap(); + endBootstrap(false); } @Override public void endInput() throws Exception { - endBootstrap(); + endBootstrap(true); } - private void endBootstrap() throws Exception { - if (bootstrapBuffer != null) { - assigner.endBoostrap(); - bootstrapBuffer.complete(); - try (RowBuffer.RowBufferIterator iterator = bootstrapBuffer.newIterator()) { - while (iterator.advanceNext()) { - assigner.processInput(fromRow.apply(iterator.getRow())); - } - } - bootstrapBuffer.reset(); - bootstrapBuffer = null; + private void endBootstrap(boolean isEndInput) throws Exception { + if (assigner.inBoostrap()) { + assigner.endBoostrap(isEndInput); } } - private void collect(T value, int bucket) { + private void collect(InternalRow value, int bucket) { output.collect(new StreamRecord<>(new Tuple2<>(value, bucket))); } @@ -150,23 +102,11 @@ public class GlobalIndexAssignerOperator<T> extends AbstractStreamOperator<Tuple } } - public static GlobalIndexAssignerOperator<InternalRow> forRowData(Table table) { - return new GlobalIndexAssignerOperator<>( - table, createRowDataAssigner(table), r -> r, r -> r); + public static GlobalIndexAssignerOperator forRowData(Table table) { + return new GlobalIndexAssignerOperator(createRowDataAssigner(table)); } - public static GlobalIndexAssigner<InternalRow> createRowDataAssigner(Table t) { - RowType bootstrapType = IndexBootstrap.bootstrapType(((AbstractFileStoreTable) t).schema()); - int bucketIndex = bootstrapType.getFieldCount() - 1; - return new GlobalIndexAssigner<>( - t, - RowPartitionKeyExtractor::new, - KeyPartPartitionKeyExtractor::new, - row -> row.getInt(bucketIndex), - new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()), - (rowData, rowKind) -> { - rowData.setRowKind(rowKind); - return rowData; - }); + public static GlobalIndexAssigner createRowDataAssigner(Table t) { + return new GlobalIndexAssigner(t); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java index 4d12f1e93..b2e8d0871 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerTest.java @@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test; import java.io.File; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ThreadLocalRandom; @@ -45,13 +46,12 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link GlobalIndexAssigner}. */ public class GlobalIndexAssignerTest extends TableTestBase { - private GlobalIndexAssigner<InternalRow> createAssigner(MergeEngine mergeEngine) - throws Exception { + private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine) throws Exception { return createAssigner(mergeEngine, false); } - private GlobalIndexAssigner<InternalRow> createAssigner( - MergeEngine mergeEngine, boolean enableTtl) throws Exception { + private GlobalIndexAssigner createAssigner(MergeEngine mergeEngine, boolean enableTtl) + throws Exception { Identifier identifier = identifier("T"); Options options = new Options(); options.set(CoreOptions.MERGE_ENGINE, mergeEngine); @@ -91,16 +91,10 @@ public class GlobalIndexAssignerTest extends TableTestBase { } private void innerTestBucketAssign(boolean enableTtl) throws Exception { - GlobalIndexAssigner<InternalRow> assigner = - createAssigner(MergeEngine.DEDUPLICATE, enableTtl); + GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE, enableTtl); List<Integer> output = new ArrayList<>(); - assigner.open( - ioManager(), - new File(warehouse.getPath()), - 2, - 0, - (row, bucket) -> output.add(bucket)); - assigner.endBoostrap(); + assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(bucket)); + assigner.endBoostrap(false); // assign assigner.processInput(GenericRow.of(1, 1, 1)); @@ -131,15 +125,10 @@ public class GlobalIndexAssignerTest extends TableTestBase { @Test public void testUpsert() throws Exception { - GlobalIndexAssigner<InternalRow> assigner = createAssigner(MergeEngine.DEDUPLICATE); + GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE); List<Tuple2<InternalRow, Integer>> output = new ArrayList<>(); - assigner.open( - ioManager(), - new File(warehouse.getPath()), - 2, - 0, - (row, bucket) -> output.add(new Tuple2<>(row, bucket))); - assigner.endBoostrap(); + assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new Tuple2<>(row, bucket))); + assigner.endBoostrap(false); // change partition assigner.processInput(GenericRow.of(1, 1, 1)); @@ -180,15 +169,10 @@ public class GlobalIndexAssignerTest extends TableTestBase { ThreadLocalRandom.current().nextBoolean() ? MergeEngine.PARTIAL_UPDATE : MergeEngine.AGGREGATE; - GlobalIndexAssigner<InternalRow> assigner = createAssigner(mergeEngine); + GlobalIndexAssigner assigner = createAssigner(mergeEngine); List<Tuple2<InternalRow, Integer>> output = new ArrayList<>(); - assigner.open( - ioManager(), - new File(warehouse.getPath()), - 2, - 0, - (row, bucket) -> output.add(new Tuple2<>(row, bucket))); - assigner.endBoostrap(); + assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new Tuple2<>(row, bucket))); + assigner.endBoostrap(false); // change partition assigner.processInput(GenericRow.of(1, 1, 1)); @@ -210,15 +194,10 @@ public class GlobalIndexAssignerTest extends TableTestBase { @Test public void testFirstRow() throws Exception { - GlobalIndexAssigner<InternalRow> assigner = createAssigner(MergeEngine.FIRST_ROW); + GlobalIndexAssigner assigner = createAssigner(MergeEngine.FIRST_ROW); List<Tuple2<InternalRow, Integer>> output = new ArrayList<>(); - assigner.open( - ioManager(), - new File(warehouse.getPath()), - 2, - 0, - (row, bucket) -> output.add(new Tuple2<>(row, bucket))); - assigner.endBoostrap(); + assigner.open(ioManager(), 2, 0, (row, bucket) -> output.add(new Tuple2<>(row, bucket))); + assigner.endBoostrap(false); // change partition assigner.processInput(GenericRow.of(1, 1, 1)); @@ -234,4 +213,34 @@ public class GlobalIndexAssignerTest extends TableTestBase { output.clear(); assigner.close(); } + + @Test + public void testBootstrapRecords() throws Exception { + GlobalIndexAssigner assigner = createAssigner(MergeEngine.DEDUPLICATE); + List<List<Integer>> output = new ArrayList<>(); + assigner.open( + ioManager(), + 2, + 0, + (row, bucket) -> + output.add( + Arrays.asList( + row.getInt(0), row.getInt(1), row.getInt(2), bucket))); + + assigner.processInput(GenericRow.of(1, 1, 1)); + assigner.processInput(GenericRow.of(2, 1, 2)); + assigner.processInput(GenericRow.of(2, 2, 2)); + assigner.processInput(GenericRow.of(2, 3, 3)); + assigner.processInput(GenericRow.of(2, 4, 4)); + assigner.endBoostrap(true); + + assertThat(output) + .containsExactlyInAnyOrder( + Arrays.asList(2, 1, 2, 0), + Arrays.asList(2, 2, 2, 0), + Arrays.asList(2, 3, 3, 0), + Arrays.asList(2, 4, 4, 2)); + output.clear(); + assigner.close(); + } }