This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.5
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 52be707ed3e1bf38291b2539744a5dab8fc2fbc0
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Aug 31 19:07:37 2023 +0800

    [flink] Fix global dynamic bucket mode (#1914)
---
 .../data/serializer/InternalRowSerializer.java     |   2 +-
 .../paimon/disk/BufferFileReaderInputView.java     |  99 ++++++
 .../org/apache/paimon/disk/ExternalBuffer.java     | 383 +++++++++++++++++++++
 .../org/apache/paimon/disk/ExternalBufferTest.java | 209 +++++++++++
 .../paimon/flink/sink/cdc/CdcSinkBuilder.java      |   4 -
 .../sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java  |   3 -
 .../flink/sink/index/GlobalDynamicBucketSink.java  |  16 +-
 .../sink/index/GlobalDynamicCdcBucketSink.java     | 133 -------
 .../flink/sink/index/GlobalIndexAssigner.java      |  29 +-
 .../sink/index/GlobalIndexAssignerOperator.java    | 107 ++++--
 .../paimon/flink/sink/index/IndexBootstrap.java    |  48 ++-
 .../sink/index/KeyPartRowChannelComputer.java      |   6 +-
 .../flink/GlobalDynamicBucketTableITCase.java      |  19 +
 .../sink/cdc/FlinkCdcSyncTableSinkITCase.java      |   2 +
 .../flink/sink/index/IndexBootstrapTest.java       |  17 +-
 15 files changed, 875 insertions(+), 202 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
index b0701be48..0f573f10b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java
@@ -72,7 +72,7 @@ public class InternalRowSerializer extends 
AbstractRowDataSerializer<InternalRow
     }
 
     @Override
-    public Serializer<InternalRow> duplicate() {
+    public InternalRowSerializer duplicate() {
         Serializer<?>[] duplicateFieldSerializers = new 
Serializer[fieldSerializers.length];
         for (int i = 0; i < fieldSerializers.length; i++) {
             duplicateFieldSerializers[i] = fieldSerializers[i].duplicate();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
 
b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
new file mode 100644
index 000000000..ab91b8944
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/disk/BufferFileReaderInputView.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.Buffer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+/** An {@link AbstractPagedInputView} which reads blocks from channel without 
compression. */
+public class BufferFileReaderInputView extends AbstractPagedInputView {
+
+    private final BufferFileReader reader;
+    private final MemorySegment segment;
+
+    private int currentSegmentLimit;
+
+    public BufferFileReaderInputView(FileIOChannel.ID id, IOManager ioManager, 
int segmentSize)
+            throws IOException {
+        this.reader = ioManager.createBufferFileReader(id);
+        this.segment = MemorySegment.wrap(new byte[segmentSize]);
+    }
+
+    @Override
+    protected MemorySegment nextSegment(MemorySegment current) throws 
IOException {
+        if (reader.hasReachedEndOfFile()) {
+            throw new EOFException();
+        }
+
+        Buffer buffer = Buffer.create(segment);
+        reader.readInto(buffer);
+        this.currentSegmentLimit = buffer.getSize();
+        return segment;
+    }
+
+    @Override
+    protected int getLimitForSegment(MemorySegment segment) {
+        return currentSegmentLimit;
+    }
+
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    public FileIOChannel getChannel() {
+        return reader;
+    }
+
+    public MutableObjectIterator<BinaryRow> createBinaryRowIterator(
+            BinaryRowSerializer serializer) {
+        return new BinaryRowChannelInputViewIterator(serializer);
+    }
+
+    private class BinaryRowChannelInputViewIterator implements 
MutableObjectIterator<BinaryRow> {
+
+        protected final BinaryRowSerializer serializer;
+
+        public BinaryRowChannelInputViewIterator(BinaryRowSerializer 
serializer) {
+            this.serializer = serializer;
+        }
+
+        @Override
+        public BinaryRow next(BinaryRow reuse) throws IOException {
+            try {
+                return this.serializer.deserializeFromPages(reuse, 
BufferFileReaderInputView.this);
+            } catch (EOFException e) {
+                close();
+                return null;
+            }
+        }
+
+        @Override
+        public BinaryRow next() throws IOException {
+            throw new UnsupportedOperationException(
+                    "This method is disabled due to performance issue!");
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java 
b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
new file mode 100644
index 000000000..9ced8e83d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/disk/ExternalBuffer.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.AbstractRowDataSerializer;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.Buffer;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** An external buffer for storing rows, it will spill the data to disk when 
the memory is full. */
+public class ExternalBuffer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExternalBuffer.class);
+
+    private final IOManager ioManager;
+    private final MemorySegmentPool pool;
+    private final BinaryRowSerializer binaryRowSerializer;
+    private final InMemoryBuffer inMemoryBuffer;
+
+    // The size of each segment
+    private final int segmentSize;
+
+    private final List<ChannelWithMeta> spilledChannelIDs;
+    private int numRows;
+
+    private boolean addCompleted;
+
+    public ExternalBuffer(
+            IOManager ioManager, MemorySegmentPool pool, 
AbstractRowDataSerializer<?> serializer) {
+        this.ioManager = ioManager;
+        this.pool = pool;
+
+        this.binaryRowSerializer =
+                serializer instanceof BinaryRowSerializer
+                        ? (BinaryRowSerializer) serializer.duplicate()
+                        : new BinaryRowSerializer(serializer.getArity());
+
+        this.segmentSize = pool.pageSize();
+
+        this.spilledChannelIDs = new ArrayList<>();
+
+        this.numRows = 0;
+
+        this.addCompleted = false;
+
+        //noinspection unchecked
+        this.inMemoryBuffer =
+                new InMemoryBuffer((AbstractRowDataSerializer<InternalRow>) 
serializer);
+    }
+
+    public void reset() {
+        clearChannels();
+        inMemoryBuffer.reset();
+        numRows = 0;
+        addCompleted = false;
+    }
+
+    public void add(InternalRow row) throws IOException {
+        checkState(!addCompleted, "This buffer has add completed.");
+        if (!inMemoryBuffer.write(row)) {
+            // Check if record is too big.
+            if (inMemoryBuffer.getCurrentDataBufferOffset() == 0) {
+                throwTooBigException(row);
+            }
+            spill();
+            if (!inMemoryBuffer.write(row)) {
+                throwTooBigException(row);
+            }
+        }
+
+        numRows++;
+    }
+
+    public void complete() {
+        addCompleted = true;
+    }
+
+    public BufferIterator newIterator() {
+        checkState(addCompleted, "This buffer has not add completed.");
+        return new BufferIterator();
+    }
+
+    private void throwTooBigException(InternalRow row) throws IOException {
+        int rowSize = 
inMemoryBuffer.serializer.toBinaryRow(row).toBytes().length;
+        throw new IOException(
+                "Record is too big, it can't be added to a empty 
InMemoryBuffer! "
+                        + "Record size: "
+                        + rowSize
+                        + ", Buffer: "
+                        + memorySize());
+    }
+
+    private void spill() throws IOException {
+        FileIOChannel.ID channel = ioManager.createChannel();
+
+        BufferFileWriter writer = ioManager.createBufferFileWriter(channel);
+        int numRecordBuffers = inMemoryBuffer.getNumRecordBuffers();
+        ArrayList<MemorySegment> segments = 
inMemoryBuffer.getRecordBufferSegments();
+        try {
+            // spill in memory buffer in zero-copy.
+            for (int i = 0; i < numRecordBuffers; i++) {
+                MemorySegment segment = segments.get(i);
+                int bufferSize =
+                        i == numRecordBuffers - 1
+                                ? inMemoryBuffer.getNumBytesInLastBuffer()
+                                : segment.size();
+                writer.writeBlock(Buffer.create(segment, bufferSize));
+            }
+            LOG.info(
+                    "here spill the reset buffer data with {} records {} 
bytes",
+                    inMemoryBuffer.numRecords,
+                    writer.getSize());
+            writer.close();
+        } catch (IOException e) {
+            writer.closeAndDelete();
+            throw e;
+        }
+
+        spilledChannelIDs.add(
+                new ChannelWithMeta(
+                        channel,
+                        inMemoryBuffer.getNumRecordBuffers(),
+                        inMemoryBuffer.getNumBytesInLastBuffer()));
+
+        inMemoryBuffer.reset();
+    }
+
+    public int size() {
+        return numRows;
+    }
+
+    private int memorySize() {
+        return pool.freePages() * segmentSize;
+    }
+
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private void clearChannels() {
+        for (ChannelWithMeta meta : spilledChannelIDs) {
+            final File f = new File(meta.getChannel().getPath());
+            if (f.exists()) {
+                f.delete();
+            }
+        }
+        spilledChannelIDs.clear();
+    }
+
+    /** Iterator of external buffer. */
+    public class BufferIterator implements Closeable {
+
+        private MutableObjectIterator<BinaryRow> currentIterator;
+        private final BinaryRow reuse = binaryRowSerializer.createInstance();
+
+        private int currentChannelID = -1;
+        private BinaryRow row;
+        private boolean closed;
+        private BufferFileReaderInputView channelReader;
+
+        private BufferIterator() {
+            this.closed = false;
+        }
+
+        private void checkValidity() {
+            if (closed) {
+                throw new RuntimeException("This iterator is closed!");
+            }
+        }
+
+        @Override
+        public void close() {
+            if (closed) {
+                return;
+            }
+
+            try {
+                closeCurrentFileReader();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+            closed = true;
+        }
+
+        public boolean advanceNext() {
+            checkValidity();
+
+            try {
+                // get from curr iterator or new iterator.
+                while (true) {
+                    if (currentIterator != null && (row = 
currentIterator.next(reuse)) != null) {
+                        return true;
+                    } else {
+                        if (!nextIterator()) {
+                            return false;
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        private boolean nextIterator() throws IOException {
+            if (currentChannelID == Integer.MAX_VALUE) {
+                return false;
+            } else if (currentChannelID < spilledChannelIDs.size() - 1) {
+                nextSpilledIterator();
+            } else {
+                newMemoryIterator();
+            }
+            return true;
+        }
+
+        public BinaryRow getRow() {
+            return row;
+        }
+
+        private void closeCurrentFileReader() throws IOException {
+            if (channelReader != null) {
+                channelReader.close();
+                channelReader = null;
+            }
+        }
+
+        private void nextSpilledIterator() throws IOException {
+            ChannelWithMeta channel = spilledChannelIDs.get(currentChannelID + 
1);
+            currentChannelID++;
+
+            // close current reader first.
+            closeCurrentFileReader();
+
+            // new reader.
+            this.channelReader =
+                    new BufferFileReaderInputView(channel.getChannel(), 
ioManager, segmentSize);
+            this.currentIterator = 
channelReader.createBinaryRowIterator(binaryRowSerializer);
+        }
+
+        private void newMemoryIterator() {
+            this.currentChannelID = Integer.MAX_VALUE;
+            this.currentIterator = inMemoryBuffer.newIterator();
+        }
+    }
+
+    @VisibleForTesting
+    List<ChannelWithMeta> getSpillChannels() {
+        return spilledChannelIDs;
+    }
+
+    private class InMemoryBuffer {
+
+        private final AbstractRowDataSerializer<InternalRow> serializer;
+        private final ArrayList<MemorySegment> recordBufferSegments;
+        private final SimpleCollectingOutputView recordCollector;
+
+        private long currentDataBufferOffset;
+        private int numBytesInLastBuffer;
+        private int numRecords = 0;
+
+        private InMemoryBuffer(AbstractRowDataSerializer<InternalRow> 
serializer) {
+            // serializer has states, so we must duplicate
+            this.serializer = (AbstractRowDataSerializer<InternalRow>) 
serializer.duplicate();
+            this.recordBufferSegments = new ArrayList<>();
+            this.recordCollector =
+                    new SimpleCollectingOutputView(this.recordBufferSegments, 
pool, segmentSize);
+        }
+
+        private void reset() {
+            this.currentDataBufferOffset = 0;
+            this.numRecords = 0;
+            returnToSegmentPool();
+            this.recordCollector.reset();
+        }
+
+        private void returnToSegmentPool() {
+            pool.returnAll(this.recordBufferSegments);
+            this.recordBufferSegments.clear();
+        }
+
+        public boolean write(InternalRow row) throws IOException {
+            try {
+                this.serializer.serializeToPages(row, this.recordCollector);
+                currentDataBufferOffset = 
this.recordCollector.getCurrentOffset();
+                numBytesInLastBuffer = 
this.recordCollector.getCurrentPositionInSegment();
+                numRecords++;
+                return true;
+            } catch (EOFException e) {
+                return false;
+            }
+        }
+
+        private ArrayList<MemorySegment> getRecordBufferSegments() {
+            return recordBufferSegments;
+        }
+
+        private long getCurrentDataBufferOffset() {
+            return currentDataBufferOffset;
+        }
+
+        private int getNumRecordBuffers() {
+            int result = (int) (currentDataBufferOffset / segmentSize);
+            long mod = currentDataBufferOffset % segmentSize;
+            if (mod != 0) {
+                result += 1;
+            }
+            return result;
+        }
+
+        private int getNumBytesInLastBuffer() {
+            return numBytesInLastBuffer;
+        }
+
+        private InMemoryBufferIterator newIterator() {
+            RandomAccessInputView recordBuffer =
+                    new RandomAccessInputView(
+                            this.recordBufferSegments, segmentSize, 
numBytesInLastBuffer);
+            return new InMemoryBufferIterator(recordBuffer, serializer);
+        }
+    }
+
+    private static class InMemoryBufferIterator
+            implements MutableObjectIterator<BinaryRow>, Closeable {
+
+        private final RandomAccessInputView recordBuffer;
+        private final AbstractRowDataSerializer<InternalRow> serializer;
+
+        private InMemoryBufferIterator(
+                RandomAccessInputView recordBuffer,
+                AbstractRowDataSerializer<InternalRow> serializer) {
+            this.recordBuffer = recordBuffer;
+            this.serializer = serializer;
+        }
+
+        @Override
+        public BinaryRow next(BinaryRow reuse) throws IOException {
+            try {
+                return (BinaryRow) serializer.mapFromPages(reuse, 
recordBuffer);
+            } catch (EOFException e) {
+                return null;
+            }
+        }
+
+        @Override
+        public BinaryRow next() throws IOException {
+            throw new RuntimeException("Not support!");
+        }
+
+        @Override
+        public void close() {}
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java 
b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
new file mode 100644
index 000000000..5bd9ad511
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/disk/ExternalBufferTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.     See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.        You may obtain a copy of the License at
+ *
+ *             http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.disk;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import static org.apache.paimon.memory.MemorySegmentPool.DEFAULT_PAGE_SIZE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ExternalBuffer}. */
+public class ExternalBufferTest {
+
+    @TempDir Path tempDir;
+
+    private IOManager ioManager;
+    private Random random;
+    private BinaryRowSerializer serializer;
+
+    @BeforeEach
+    public void before() {
+        this.ioManager = IOManager.create(tempDir.toString());
+        this.random = new Random();
+        this.serializer = new BinaryRowSerializer(1);
+    }
+
+    private ExternalBuffer newBuffer() {
+        return new ExternalBuffer(
+                ioManager,
+                new HeapMemorySegmentPool(2 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
+                this.serializer);
+    }
+
+    @Test
+    public void testLess() throws Exception {
+        ExternalBuffer buffer = newBuffer();
+
+        int number = 100;
+        List<Long> expected = insertMulti(buffer, number);
+        assertThat(number).isEqualTo(buffer.size());
+        assertBuffer(expected, buffer);
+        assertThat(0).isEqualTo(buffer.getSpillChannels().size());
+
+        // repeat read
+        assertBuffer(expected, buffer);
+        buffer.newIterator();
+        assertBuffer(expected, buffer);
+        buffer.reset();
+    }
+
+    @Test
+    public void testSpill() throws Exception {
+        ExternalBuffer buffer = newBuffer();
+
+        int number = 5000; // 16 * 5000
+        List<Long> expected = insertMulti(buffer, number);
+        assertThat(number).isEqualTo(buffer.size());
+        assertBuffer(expected, buffer);
+        assertThat(buffer.getSpillChannels().size()).isGreaterThan(0);
+
+        // repeat read
+        assertBuffer(expected, buffer);
+        buffer.newIterator();
+        assertBuffer(expected, buffer);
+        buffer.reset();
+    }
+
+    @Test
+    public void testBufferReset() throws Exception {
+        ExternalBuffer buffer = newBuffer();
+
+        // less
+        insertMulti(buffer, 10);
+        buffer.reset();
+        assertThat(0).isEqualTo(buffer.size());
+
+        // not spill
+        List<Long> expected = insertMulti(buffer, 100);
+        assertThat(100).isEqualTo(buffer.size());
+        assertBuffer(expected, buffer);
+        buffer.reset();
+
+        // spill
+        expected = insertMulti(buffer, 2500);
+        assertThat(2500).isEqualTo(buffer.size());
+        assertBuffer(expected, buffer);
+        buffer.reset();
+    }
+
+    @Test
+    public void testBufferResetWithSpill() throws Exception {
+        int inMemoryThreshold = 20;
+        ExternalBuffer buffer = newBuffer();
+
+        // spill
+        List<Long> expected = insertMulti(buffer, 5000);
+        assertThat(5000).isEqualTo(buffer.size());
+        assertBuffer(expected, buffer);
+        buffer.reset();
+
+        // spill, but not read the values
+        insertMulti(buffer, 5000);
+        buffer.newIterator();
+        assertThat(5000).isEqualTo(buffer.size());
+        buffer.reset();
+
+        // not spill
+        expected = insertMulti(buffer, inMemoryThreshold / 2);
+        assertBuffer(expected, buffer);
+        buffer.reset();
+        assertThat(0).isEqualTo(buffer.size());
+
+        // less
+        expected = insertMulti(buffer, 100);
+        assertThat(100).isEqualTo(buffer.size());
+        assertBuffer(expected, buffer);
+        buffer.reset();
+    }
+
+    @Test
+    public void testHugeRecord() {
+        ExternalBuffer buffer =
+                new ExternalBuffer(
+                        ioManager,
+                        new HeapMemorySegmentPool(3 * DEFAULT_PAGE_SIZE, 
DEFAULT_PAGE_SIZE),
+                        new BinaryRowSerializer(1));
+        assertThatThrownBy(() -> 
writeHuge(buffer)).isInstanceOf(IOException.class);
+        buffer.reset();
+    }
+
+    private void writeHuge(ExternalBuffer buffer) throws IOException {
+        BinaryRow row = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.reset();
+        RandomDataGenerator random = new RandomDataGenerator();
+        writer.writeString(0, 
BinaryString.fromString(random.nextHexString(500000)));
+        writer.complete();
+        buffer.add(row);
+    }
+
+    private void assertBuffer(List<Long> expected, ExternalBuffer buffer) {
+        ExternalBuffer.BufferIterator iterator = buffer.newIterator();
+        assertBuffer(expected, iterator);
+        iterator.close();
+    }
+
+    private void assertBuffer(List<Long> expected, 
ExternalBuffer.BufferIterator iterator) {
+        List<Long> values = new ArrayList<>();
+        while (iterator.advanceNext()) {
+            values.add(iterator.getRow().getLong(0));
+        }
+        assertThat(values).isEqualTo(expected);
+    }
+
+    private List<Long> insertMulti(ExternalBuffer buffer, int cnt) throws 
IOException {
+        ArrayList<Long> expected = new ArrayList<>(cnt);
+        insertMulti(buffer, cnt, expected);
+        buffer.complete();
+        return expected;
+    }
+
+    private void insertMulti(ExternalBuffer buffer, int cnt, List<Long> 
expected)
+            throws IOException {
+        for (int i = 0; i < cnt; i++) {
+            expected.add(randomInsert(buffer));
+        }
+    }
+
+    private long randomInsert(ExternalBuffer buffer) throws IOException {
+        long l = random.nextLong();
+        BinaryRow row = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.reset();
+        writer.writeLong(0, l);
+        writer.complete();
+        buffer.add(row);
+        return l;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
index 4f1745406..1bda75695 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcSinkBuilder.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink.cdc;
 import org.apache.paimon.annotation.Experimental;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
@@ -119,9 +118,6 @@ public class CdcSinkBuilder<T> {
                 return buildForFixedBucket(parsed);
             case DYNAMIC:
                 return new CdcDynamicBucketSink((FileStoreTable) 
table).build(parsed, parallelism);
-            case GLOBAL_DYNAMIC:
-                return new GlobalDynamicCdcBucketSink((FileStoreTable) table)
-                        .build(parsed, parallelism);
             case UNAWARE:
             default:
                 throw new UnsupportedOperationException("Unsupported bucket 
mode: " + bucketMode);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
index c5b716e45..a3dc7beeb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncDatabaseSinkBuilder.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.action.cdc.DatabaseSyncMode;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
-import org.apache.paimon.flink.sink.index.GlobalDynamicCdcBucketSink;
 import org.apache.paimon.flink.utils.SingleOutputStreamOperatorUtils;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.BucketMode;
@@ -196,8 +195,6 @@ public class FlinkCdcSyncDatabaseSinkBuilder<T> {
                     new CdcDynamicBucketSink(table).build(parsedForTable, 
parallelism);
                     break;
                 case GLOBAL_DYNAMIC:
-                    new 
GlobalDynamicCdcBucketSink(table).build(parsedForTable, parallelism);
-                    break;
                 case UNAWARE:
                 default:
                     throw new UnsupportedOperationException(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
index 5f98f3b05..26cfc9b41 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicBucketSink.java
@@ -43,11 +43,10 @@ import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+import static org.apache.paimon.flink.sink.index.IndexBootstrap.bootstrapType;
 
 /** Sink for global dynamic bucket table. */
 public class GlobalDynamicBucketSink extends FlinkWriteSink<Tuple2<RowData, 
Integer>> {
@@ -71,13 +70,10 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<RowData, Inte
         TableSchema schema = table.schema();
         RowType rowType = schema.logicalRowType();
         List<String> primaryKeys = schema.primaryKeys();
-        List<String> partitionKeys = schema.partitionKeys();
         RowDataSerializer rowSerializer = new 
RowDataSerializer(toLogicalType(rowType));
-        RowType keyPartType =
-                schema.projectedLogicalRowType(
-                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
-                                .collect(Collectors.toList()));
-        RowDataSerializer keyPartSerializer = new 
RowDataSerializer(toLogicalType(keyPartType));
+
+        RowType bootstrapType = bootstrapType(schema);
+        RowDataSerializer bootstrapSerializer = new 
RowDataSerializer(toLogicalType(bootstrapType));
 
         // Topology:
         // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- 
shuffle by bucket -->
@@ -88,7 +84,7 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<RowData, Inte
                                 "INDEX_BOOTSTRAP",
                                 new InternalTypeInfo<>(
                                         new KeyWithRowSerializer<>(
-                                                keyPartSerializer, 
rowSerializer)),
+                                                bootstrapSerializer, 
rowSerializer)),
                                 new IndexBootstrapOperator<>(
                                         new IndexBootstrap(table), 
FlinkRowData::new))
                         .setParallelism(input.getParallelism());
@@ -100,7 +96,7 @@ public class GlobalDynamicBucketSink extends 
FlinkWriteSink<Tuple2<RowData, Inte
         }
 
         KeyPartRowChannelComputer channelComputer =
-                new KeyPartRowChannelComputer(rowType, keyPartType, 
primaryKeys);
+                new KeyPartRowChannelComputer(rowType, bootstrapType, 
primaryKeys);
         DataStream<Tuple2<KeyPartOrRow, RowData>> partitionByKeyHash =
                 partition(bootstraped, channelComputer, assignerParallelism);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
deleted file mode 100644
index 06bb5a147..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalDynamicCdcBucketSink.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink.index;
-
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.flink.sink.ChannelComputer;
-import org.apache.paimon.flink.sink.Committable;
-import org.apache.paimon.flink.sink.FlinkWriteSink;
-import org.apache.paimon.flink.sink.StoreSinkWrite;
-import org.apache.paimon.flink.sink.cdc.CdcDynamicBucketWriteOperator;
-import org.apache.paimon.flink.sink.cdc.CdcHashKeyChannelComputer;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
-import org.apache.paimon.flink.sink.cdc.CdcWithBucketChannelComputer;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.EnumTypeInfo;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
-
-/** Sink for global dynamic bucket table and {@link CdcRecord} inputs. */
-public class GlobalDynamicCdcBucketSink extends 
FlinkWriteSink<Tuple2<CdcRecord, Integer>> {
-
-    private static final long serialVersionUID = 1L;
-
-    public GlobalDynamicCdcBucketSink(FileStoreTable table) {
-        super(table, null);
-    }
-
-    @Override
-    protected OneInputStreamOperator<Tuple2<CdcRecord, Integer>, Committable> 
createWriteOperator(
-            StoreSinkWrite.Provider writeProvider, String commitUser) {
-        return new CdcDynamicBucketWriteOperator(table, writeProvider, 
commitUser);
-    }
-
-    public DataStreamSink<?> build(DataStream<CdcRecord> input, @Nullable 
Integer parallelism) {
-        String initialCommitUser = UUID.randomUUID().toString();
-
-        TableSchema schema = table.schema();
-        List<String> primaryKeys = schema.primaryKeys();
-        List<String> partitionKeys = schema.partitionKeys();
-        RowType keyPartType =
-                schema.projectedLogicalRowType(
-                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
-                                .collect(Collectors.toList()));
-        List<String> keyPartNames = keyPartType.getFieldNames();
-        RowDataToObjectArrayConverter keyPartConverter =
-                new RowDataToObjectArrayConverter(keyPartType);
-
-        // Topology:
-        // input -- bootstrap -- shuffle by key hash --> bucket-assigner -- 
shuffle by bucket -->
-        // writer --> committer
-
-        TupleTypeInfo<Tuple2<KeyPartOrRow, CdcRecord>> tuple2TupleType =
-                new TupleTypeInfo<>(new EnumTypeInfo<>(KeyPartOrRow.class), 
input.getType());
-        DataStream<Tuple2<KeyPartOrRow, CdcRecord>> bootstraped =
-                input.transform(
-                                "INDEX_BOOTSTRAP",
-                                tuple2TupleType,
-                                new IndexBootstrapOperator<>(
-                                        new IndexBootstrap(table),
-                                        row ->
-                                                CdcRecordUtils.fromGenericRow(
-                                                        GenericRow.ofKind(
-                                                                
row.getRowKind(),
-                                                                
keyPartConverter.convert(row)),
-                                                        keyPartNames)))
-                        .setParallelism(input.getParallelism());
-
-        // 1. shuffle by key hash
-        Integer assignerParallelism = 
table.coreOptions().dynamicBucketAssignerParallelism();
-        if (assignerParallelism == null) {
-            assignerParallelism = parallelism;
-        }
-
-        ChannelComputer<Tuple2<KeyPartOrRow, CdcRecord>> channelComputer =
-                ChannelComputer.transform(
-                        new CdcHashKeyChannelComputer(schema), tuple2 -> 
tuple2.f1);
-        DataStream<Tuple2<KeyPartOrRow, CdcRecord>> partitionByKeyHash =
-                partition(bootstraped, channelComputer, assignerParallelism);
-
-        // 2. bucket-assigner
-        TupleTypeInfo<Tuple2<CdcRecord, Integer>> rowWithBucketType =
-                new TupleTypeInfo<>(input.getType(), 
BasicTypeInfo.INT_TYPE_INFO);
-        DataStream<Tuple2<CdcRecord, Integer>> bucketAssigned =
-                partitionByKeyHash
-                        .transform(
-                                "dynamic-bucket-assigner",
-                                rowWithBucketType,
-                                
GlobalIndexAssignerOperator.forCdcRecord(table))
-                        .setParallelism(partitionByKeyHash.getParallelism());
-
-        // 3. shuffle by bucket
-
-        DataStream<Tuple2<CdcRecord, Integer>> partitionByBucket =
-                partition(bucketAssigned, new 
CdcWithBucketChannelComputer(schema), parallelism);
-
-        // 4. writer and committer
-        return sinkFrom(partitionByBucket, initialCommitUser);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
index 2d3be0a12..78340b246 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssigner.java
@@ -57,7 +57,8 @@ public class GlobalIndexAssigner<T> implements Serializable {
     private final AbstractFileStoreTable table;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction;
     private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>>
-            keyPartExtractorFunction;
+            bootstrapExtractorFunction;
+    private final SerializableFunction<T, Integer> bootstrapBucketFunction;
     private final SerBiFunction<T, BinaryRow, T> setPartition;
     private final SerBiFunction<T, RowKind, T> setRowKind;
 
@@ -78,12 +79,14 @@ public class GlobalIndexAssigner<T> implements Serializable 
{
     public GlobalIndexAssigner(
             Table table,
             SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
extractorFunction,
-            SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
keyPartExtractorFunction,
+            SerializableFunction<TableSchema, PartitionKeyExtractor<T>> 
bootstrapExtractorFunction,
+            SerializableFunction<T, Integer> bootstrapBucketFunction,
             SerBiFunction<T, BinaryRow, T> setPartition,
             SerBiFunction<T, RowKind, T> setRowKind) {
         this.table = (AbstractFileStoreTable) table;
         this.extractorFunction = extractorFunction;
-        this.keyPartExtractorFunction = keyPartExtractorFunction;
+        this.bootstrapExtractorFunction = bootstrapExtractorFunction;
+        this.bootstrapBucketFunction = bootstrapBucketFunction;
         this.setPartition = setPartition;
         this.setRowKind = setRowKind;
     }
@@ -97,7 +100,7 @@ public class GlobalIndexAssigner<T> implements Serializable {
         CoreOptions coreOptions = table.coreOptions();
         this.targetBucketRowNumber = (int) 
coreOptions.dynamicBucketTargetRowNum();
         this.extractor = extractorFunction.apply(table.schema());
-        this.keyPartExtractor = keyPartExtractorFunction.apply(table.schema());
+        this.keyPartExtractor = 
bootstrapExtractorFunction.apply(table.schema());
 
         // state
         Options options = coreOptions.toConfiguration();
@@ -164,9 +167,12 @@ public class GlobalIndexAssigner<T> implements 
Serializable {
 
     public void bootstrap(T value) throws IOException {
         BinaryRow partition = keyPartExtractor.partition(value);
-        keyIndex.put(
-                keyPartExtractor.trimmedPrimaryKey(value),
-                new PositiveIntInt(partMapping.index(partition), 
assignBucket(partition)));
+        BinaryRow key = keyPartExtractor.trimmedPrimaryKey(value);
+        int partId = partMapping.index(partition);
+        int bucket = bootstrapBucketFunction.apply(value);
+        bucketAssigner.bootstrapBucket(partition, bucket);
+        PositiveIntInt partAndBucket = new PositiveIntInt(partId, bucket);
+        keyIndex.put(key, partAndBucket);
     }
 
     private void processNewRecord(BinaryRow partition, int partId, BinaryRow 
key, T value)
@@ -207,6 +213,15 @@ public class GlobalIndexAssigner<T> implements 
Serializable {
 
         private final Map<BinaryRow, TreeMap<Integer, Integer>> stats = new 
HashMap<>();
 
+        public void bootstrapBucket(BinaryRow part, int bucket) {
+            TreeMap<Integer, Integer> bucketMap = bucketMap(part);
+            Integer count = bucketMap.get(bucket);
+            if (count == null) {
+                count = 0;
+            }
+            bucketMap.put(bucket, count + 1);
+        }
+
         public int assignBucket(BinaryRow part, Filter<Integer> filter, int 
maxCount) {
             TreeMap<Integer, Integer> bucketMap = bucketMap(part);
             for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
index 5d5acd203..0c10b01ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/GlobalIndexAssignerOperator.java
@@ -18,56 +18,81 @@
 
 package org.apache.paimon.flink.sink.index;
 
-import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.disk.ExternalBuffer;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.flink.sink.RowDataPartitionKeyExtractor;
-import org.apache.paimon.flink.sink.cdc.CdcRecord;
-import org.apache.paimon.flink.sink.cdc.CdcRecordPartitionKeyExtractor;
-import org.apache.paimon.flink.sink.cdc.CdcRecordUtils;
 import org.apache.paimon.flink.utils.ProjectToRowDataFunction;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.memory.HeapMemorySegmentPool;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.apache.paimon.flink.FlinkRowData.toFlinkRowKind;
 
 /** A {@link OneInputStreamOperator} for {@link GlobalIndexAssigner}. */
 public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple2<T, Integer>>
-        implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, 
Integer>> {
+        implements OneInputStreamOperator<Tuple2<KeyPartOrRow, T>, Tuple2<T, 
Integer>>,
+                BoundedOneInput {
 
     private static final long serialVersionUID = 1L;
 
+    private final Table table;
     private final GlobalIndexAssigner<T> assigner;
+    private final SerializableFunction<T, InternalRow> toRow;
+    private final SerializableFunction<InternalRow, T> fromRow;
 
-    public GlobalIndexAssignerOperator(GlobalIndexAssigner<T> assigner) {
+    private transient ExternalBuffer bootstrapBuffer;
+
+    public GlobalIndexAssignerOperator(
+            Table table,
+            GlobalIndexAssigner<T> assigner,
+            SerializableFunction<T, InternalRow> toRow,
+            SerializableFunction<InternalRow, T> fromRow) {
+        this.table = table;
         this.assigner = assigner;
+        this.toRow = toRow;
+        this.fromRow = fromRow;
     }
 
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        File[] tmpDirs =
-                
getContainingTask().getEnvironment().getIOManager().getSpillingDirectories();
+        org.apache.flink.runtime.io.disk.iomanager.IOManager ioManager =
+                getContainingTask().getEnvironment().getIOManager();
+        File[] tmpDirs = ioManager.getSpillingDirectories();
         File tmpDir = 
tmpDirs[ThreadLocalRandom.current().nextInt(tmpDirs.length)];
         assigner.open(
                 tmpDir,
                 getRuntimeContext().getNumberOfParallelSubtasks(),
                 getRuntimeContext().getIndexOfThisSubtask(),
                 this::collect);
+        Options options = Options.fromMap(table.options());
+        long bufferSize = 
options.get(CoreOptions.WRITE_BUFFER_SIZE).getBytes();
+        long pageSize = options.get(CoreOptions.PAGE_SIZE).getBytes();
+        bootstrapBuffer =
+                new ExternalBuffer(
+                        
IOManager.create(ioManager.getSpillingDirectoriesPaths()),
+                        new HeapMemorySegmentPool(bufferSize, (int) pageSize),
+                        new InternalRowSerializer(table.rowType()));
     }
 
     @Override
@@ -80,11 +105,38 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
                 assigner.bootstrap(value);
                 break;
             case ROW:
-                assigner.process(value);
+                if (bootstrapBuffer != null) {
+                    bootstrapBuffer.add(toRow.apply(value));
+                } else {
+                    assigner.process(value);
+                }
                 break;
         }
     }
 
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        endBootstrap();
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        endBootstrap();
+    }
+
+    private void endBootstrap() throws Exception {
+        if (bootstrapBuffer != null) {
+            bootstrapBuffer.complete();
+            try (ExternalBuffer.BufferIterator iterator = 
bootstrapBuffer.newIterator()) {
+                while (iterator.advanceNext()) {
+                    assigner.process(fromRow.apply(iterator.getRow()));
+                }
+            }
+            bootstrapBuffer.reset();
+            bootstrapBuffer = null;
+        }
+    }
+
     private void collect(T value, int bucket) {
         output.collect(new StreamRecord<>(new Tuple2<>(value, bucket)));
     }
@@ -95,39 +147,22 @@ public class GlobalIndexAssignerOperator<T> extends 
AbstractStreamOperator<Tuple
     }
 
     public static GlobalIndexAssignerOperator<RowData> forRowData(Table table) 
{
-        return new GlobalIndexAssignerOperator<>(createRowDataAssigner(table));
+        return new GlobalIndexAssignerOperator<>(
+                table, createRowDataAssigner(table), FlinkRowWrapper::new, 
FlinkRowData::new);
     }
 
     public static GlobalIndexAssigner<RowData> createRowDataAssigner(Table t) {
+        RowType bootstrapType = 
IndexBootstrap.bootstrapType(((AbstractFileStoreTable) t).schema());
+        int bucketIndex = bootstrapType.getFieldCount() - 1;
         return new GlobalIndexAssigner<>(
                 t,
                 RowDataPartitionKeyExtractor::new,
                 KeyPartPartitionKeyExtractor::new,
+                row -> row.getInt(bucketIndex),
                 new ProjectToRowDataFunction(t.rowType(), t.partitionKeys()),
                 (rowData, rowKind) -> {
                     rowData.setRowKind(toFlinkRowKind(rowKind));
                     return rowData;
                 });
     }
-
-    public static GlobalIndexAssignerOperator<CdcRecord> forCdcRecord(Table 
table) {
-        RowType partitionType = ((FileStoreTable) 
table).schema().logicalPartitionType();
-        List<String> partitionNames = partitionType.getFieldNames();
-        RowDataToObjectArrayConverter converter = new 
RowDataToObjectArrayConverter(partitionType);
-        GlobalIndexAssigner<CdcRecord> assigner =
-                new GlobalIndexAssigner<>(
-                        table,
-                        CdcRecordPartitionKeyExtractor::new,
-                        CdcRecordPartitionKeyExtractor::new,
-                        (record, part) -> {
-                            CdcRecord partCdc =
-                                    CdcRecordUtils.fromGenericRow(
-                                            
GenericRow.of(converter.convert(part)), partitionNames);
-                            Map<String, String> fields = new 
HashMap<>(record.fields());
-                            fields.putAll(partCdc.fields());
-                            return new CdcRecord(record.kind(), fields);
-                        },
-                        CdcRecord::setRowKind);
-        return new GlobalIndexAssignerOperator<>(assigner);
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
index 2df812007..95b4c1a94 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -18,26 +18,40 @@
 
 package org.apache.paimon.flink.sink.index;
 
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.AbstractInnerTableScan;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.paimon.CoreOptions.SCAN_MODE;
+import static org.apache.paimon.CoreOptions.StartupMode.LATEST;
+
 /** Bootstrap key index from Paimon table. */
 public class IndexBootstrap implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    public static final String BUCKET_FIELD = "_BUCKET";
+
     private final Table table;
 
     public IndexBootstrap(Table table) {
@@ -56,14 +70,42 @@ public class IndexBootstrap implements Serializable {
                         .map(fieldNames::indexOf)
                         .mapToInt(Integer::intValue)
                         .toArray();
-        ReadBuilder readBuilder = 
table.newReadBuilder().withProjection(projection);
+
+        // force using the latest scan mode
+        ReadBuilder readBuilder =
+                table.copy(Collections.singletonMap(SCAN_MODE.key(), 
LATEST.toString()))
+                        .newReadBuilder()
+                        .withProjection(projection);
 
         AbstractInnerTableScan tableScan = (AbstractInnerTableScan) 
readBuilder.newScan();
         TableScan.Plan plan =
                 tableScan.withBucketFilter(bucket -> bucket % numAssigners == 
assignId).plan();
 
-        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(plan)) {
-            reader.forEachRemaining(collector);
+        for (Split split : plan.splits()) {
+            try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(split)) {
+                int bucket = ((DataSplit) split).bucket();
+                GenericRow bucketRow = GenericRow.of(bucket);
+                JoinedRow joinedRow = new JoinedRow();
+                reader.transform(row -> joinedRow.replace(row, bucketRow))
+                        .forEachRemaining(collector);
+            }
         }
     }
+
+    public static RowType bootstrapType(TableSchema schema) {
+        List<String> primaryKeys = schema.primaryKeys();
+        List<String> partitionKeys = schema.partitionKeys();
+        List<DataField> bootstrapFields =
+                new ArrayList<>(
+                        schema.projectedLogicalRowType(
+                                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
+                                                .collect(Collectors.toList()))
+                                .getFields());
+        bootstrapFields.add(
+                new DataField(
+                        RowType.currentHighestFieldId(bootstrapFields) + 1,
+                        BUCKET_FIELD,
+                        DataTypes.INT().notNull()));
+        return new RowType(bootstrapFields);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
index 9535b3826..aff593007 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/KeyPartRowChannelComputer.java
@@ -45,7 +45,6 @@ public class KeyPartRowChannelComputer implements 
ChannelComputer<Tuple2<KeyPart
 
     public KeyPartRowChannelComputer(
             RowType rowType, RowType keyPartType, List<String> primaryKey) {
-
         this.rowType = rowType;
         this.keyPartType = keyPartType;
         this.primaryKey = primaryKey;
@@ -65,4 +64,9 @@ public class KeyPartRowChannelComputer implements 
ChannelComputer<Tuple2<KeyPart
                         .apply(new FlinkRowWrapper(record.f1));
         return Math.abs(key.hashCode() % numChannels);
     }
+
+    @Override
+    public String toString() {
+        return "shuffle by key hash";
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
index 65c608ba4..794cb312f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/GlobalDynamicBucketTableITCase.java
@@ -84,4 +84,23 @@ public class GlobalDynamicBucketTableITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT DISTINCT bucket FROM T$files"))
                 .containsExactlyInAnyOrder(Row.of(0), Row.of(1), Row.of(2));
     }
+
+    @Test
+    public void testLargeRecords() {
+        sql(
+                "create table large_t (pt int, k int, v int, primary key (k) 
not enforced) partitioned by (pt) with ("
+                        + "'bucket'='-1', "
+                        + "'dynamic-bucket.target-row-num'='10000')");
+        sql(
+                "create temporary table src (pt int, k int, v int) with ("
+                        + "'connector'='datagen', "
+                        + "'number-of-rows'='100000', "
+                        + "'fields.k.min'='0', "
+                        + "'fields.k.max'='100000', "
+                        + "'fields.pt.min'='0', "
+                        + "'fields.pt.max'='1')");
+        sql("insert into large_t select * from src");
+        sql("insert into large_t select * from src");
+        assertThat(sql("select k, count(*) from large_t group by k having 
count(*) > 1")).isEmpty();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
index 81c9d1495..346e48b37 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java
@@ -46,6 +46,7 @@ import org.apache.paimon.utils.TraceableFileIO;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
@@ -76,6 +77,7 @@ public class FlinkCdcSyncTableSinkITCase extends 
AbstractTestBase {
         innerTestRandomCdcEvents(-1, false);
     }
 
+    @Disabled
     @Test
     @Timeout(120)
     public void testRandomCdcEventsGlobalDynamicBucket() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
index cd185d1de..aadec017e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/index/IndexBootstrapTest.java
@@ -65,15 +65,24 @@ public class IndexBootstrapTest extends TableTestBase {
                 GenericRow.of(7, 7));
 
         IndexBootstrap indexBootstrap = new IndexBootstrap(table);
-        List<Integer> result = new ArrayList<>();
-        Consumer<InternalRow> consumer = row -> result.add(row.getInt(0));
+        List<GenericRow> result = new ArrayList<>();
+        Consumer<InternalRow> consumer =
+                row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1)));
+
+        // output key and bucket
 
         indexBootstrap.bootstrap(2, 0, consumer);
-        assertThat(result).containsExactlyInAnyOrder(2, 3);
+        assertThat(result).containsExactlyInAnyOrder(GenericRow.of(2, 4), 
GenericRow.of(3, 0));
         result.clear();
 
         indexBootstrap.bootstrap(2, 1, consumer);
-        assertThat(result).containsExactlyInAnyOrder(1, 4, 5, 6, 7);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        GenericRow.of(1, 3),
+                        GenericRow.of(4, 1),
+                        GenericRow.of(5, 1),
+                        GenericRow.of(6, 3),
+                        GenericRow.of(7, 1));
         result.clear();
     }
 }

Reply via email to