This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b3e9e906c [core] Introduce 
cross-partition-upsert.bootstrap-parallelism (#2152)
b3e9e906c is described below

commit b3e9e906c28a8877cf5b94402af28ca59bead39c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 20 10:52:36 2023 +0800

    [core] Introduce cross-partition-upsert.bootstrap-parallelism (#2152)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  11 ++
 .../paimon/data/SimpleCollectingOutputView.java    |   5 +
 ...orySegmentSource.java => ArraySegmentPool.java} |  48 +++--
 .../apache/paimon/memory/MemorySegmentSource.java  |   3 +
 .../apache/paimon/utils/ExecutorThreadFactory.java |   0
 .../paimon/utils/FatalExitExceptionHandler.java    |   0
 .../org/apache/paimon/utils/ParallelExecution.java | 202 +++++++++++++++++++++
 .../utils/RowDataToObjectArrayConverter.java       |   5 +
 .../java/org/apache/paimon/utils/TypeUtils.java    |  11 ++
 .../apache/paimon/utils/ParallelExecutionTest.java | 182 +++++++++++++++++++
 .../paimon/flink/sink/index/IndexBootstrap.java    |  75 ++++++--
 12 files changed, 520 insertions(+), 28 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 3fb153a08..253e6414c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -116,6 +116,12 @@ under the License.
             <td>Duration</td>
             <td>The discovery interval of continuous reading.</td>
         </tr>
+        <tr>
+            <td><h5>cross-partition-upsert.bootstrap-parallelism</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>The parallelism for bootstrap in a single task for cross 
partition upsert.</td>
+        </tr>
         <tr>
             <td><h5>cross-partition-upsert.index-ttl</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 942fa65b0..84890acbf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -880,6 +880,13 @@ public class CoreOptions implements Serializable {
                                     + "this can avoid maintaining too many 
indexes and lead to worse and worse performance, "
                                     + "but please note that this may also 
cause data duplication.");
 
+    public static final ConfigOption<Integer> 
CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM =
+            key("cross-partition-upsert.bootstrap-parallelism")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            "The parallelism for bootstrap in a single task 
for cross partition upsert.");
+
     public static final ConfigOption<Integer> ZORDER_VAR_LENGTH_CONTRIBUTION =
             key("zorder.var-length-contribution")
                     .intType()
@@ -1327,6 +1334,10 @@ public class CoreOptions implements Serializable {
         return options.get(CROSS_PARTITION_UPSERT_INDEX_TTL);
     }
 
+    public int crossPartitionUpsertBootstrapParallelism() {
+        return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM);
+    }
+
     public int varTypeSize() {
         return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
index 6376365db..894d35d29 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
@@ -24,6 +24,7 @@ import org.apache.paimon.utils.MathUtils;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -40,6 +41,10 @@ public class SimpleCollectingOutputView extends 
AbstractPagedOutputView {
 
     private int segmentNum;
 
+    public SimpleCollectingOutputView(MemorySegmentSource memSource, int 
segmentSize) {
+        this(new ArrayList<>(), memSource, segmentSize);
+    }
+
     public SimpleCollectingOutputView(
             List<MemorySegment> fullSegmentTarget, MemorySegmentSource 
memSource, int segmentSize) {
         super(memSource.nextSegment(), segmentSize);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/ArraySegmentPool.java
similarity index 51%
copy from 
paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
copy to 
paimon-common/src/main/java/org/apache/paimon/memory/ArraySegmentPool.java
index 0f9701150..0c589a371 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/ArraySegmentPool.java
@@ -18,20 +18,38 @@
 
 package org.apache.paimon.memory;
 
-import org.apache.paimon.annotation.Public;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
 
-/**
- * Interface describing entities that can provide memory segments.
- *
- * @since 0.4.0
- */
-@Public
-public interface MemorySegmentSource {
-
-    /**
-     * Gets the next memory segment. If no more segments are available, it 
returns null.
-     *
-     * @return The next memory segment, or null, if none is available.
-     */
-    MemorySegment nextSegment();
+/** A {@link MemorySegmentPool} for allocated segments. */
+public class ArraySegmentPool implements MemorySegmentPool {
+
+    private final Queue<MemorySegment> segments;
+    private final int pageSize;
+
+    public ArraySegmentPool(List<MemorySegment> segments) {
+        this.segments = new LinkedList<>(segments);
+        this.pageSize = segments.get(0).size();
+    }
+
+    @Override
+    public int pageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public void returnAll(List<MemorySegment> memory) {
+        segments.addAll(memory);
+    }
+
+    @Override
+    public int freePages() {
+        return segments.size();
+    }
+
+    @Override
+    public MemorySegment nextSegment() {
+        return segments.poll();
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
index 0f9701150..f79a3500b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
@@ -20,6 +20,8 @@ package org.apache.paimon.memory;
 
 import org.apache.paimon.annotation.Public;
 
+import javax.annotation.Nullable;
+
 /**
  * Interface describing entities that can provide memory segments.
  *
@@ -33,5 +35,6 @@ public interface MemorySegmentSource {
      *
      * @return The next memory segment, or null, if none is available.
      */
+    @Nullable
     MemorySegment nextSegment();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
similarity index 100%
rename from 
paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
rename to 
paimon-common/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
similarity index 100%
rename from 
paimon-core/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
rename to 
paimon-common/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
new file mode 100644
index 000000000..ab5866e42
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.memory.ArraySegmentPool;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.reader.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * A class to help parallel execution.
+ *
+ * @param <T> Record Type.
+ * @param <E> Extra message of one {@link RecordReader}.
+ */
+public class ParallelExecution<T, E> implements Closeable {
+
+    private final Serializer<T> serializer;
+    private final BlockingQueue<MemorySegment> idlePages;
+    private final BlockingQueue<ParallelBatch<T, E>> results;
+    private final ExecutorService executorService;
+
+    private final AtomicReference<Throwable> exception;
+
+    private final CountDownLatch latch;
+
+    public ParallelExecution(
+            Serializer<T> serializer,
+            int pageSize,
+            int parallelism,
+            List<Supplier<Pair<RecordReader<T>, E>>> readers) {
+        this.serializer = serializer;
+        int totalPages = parallelism * 2;
+        this.idlePages = new ArrayBlockingQueue<>(totalPages);
+        for (int i = 0; i < totalPages; i++) {
+            idlePages.add(MemorySegment.allocateHeapMemory(pageSize));
+        }
+        this.executorService =
+                new ThreadPoolExecutor(
+                        parallelism,
+                        parallelism,
+                        1,
+                        TimeUnit.SECONDS,
+                        new LinkedBlockingQueue<>(),
+                        new 
ExecutorThreadFactory(Thread.currentThread().getName() + "-parallel"));
+        this.results = new LinkedBlockingQueue<>();
+        this.exception = new AtomicReference<>();
+        this.latch = new CountDownLatch(readers.size());
+
+        for (Supplier<Pair<RecordReader<T>, E>> readerSupplier : readers) {
+            Serializer<T> duplicate = this.serializer.duplicate();
+            executorService.submit(() -> asyncRead(readerSupplier, duplicate));
+        }
+    }
+
+    @Nullable
+    public ParallelBatch<T, E> take() throws InterruptedException, IOException 
{
+        ParallelBatch<T, E> element;
+        do {
+            if (latch.getCount() == 0 && results.isEmpty()) {
+                return null;
+            }
+
+            element = results.poll(2, TimeUnit.SECONDS);
+
+            if (exception.get() != null) {
+                throw new IOException(exception.get());
+            }
+        } while (element == null);
+        return element;
+    }
+
+    private void asyncRead(
+            Supplier<Pair<RecordReader<T>, E>> readerSupplier, Serializer<T> 
serializer) {
+        Pair<RecordReader<T>, E> pair = readerSupplier.get();
+        try (CloseableIterator<T> iterator = 
pair.getLeft().toCloseableIterator()) {
+            int count = 0;
+            SimpleCollectingOutputView outputView = null;
+
+            while (iterator.hasNext()) {
+                T next = iterator.next();
+
+                while (true) {
+                    if (outputView == null) {
+                        outputView = newOutputView();
+                        count = 0;
+                    }
+
+                    try {
+                        serializer.serialize(next, outputView);
+                        count++;
+                        break;
+                    } catch (EOFException e) {
+                        sendToResults(outputView, count, pair.getRight());
+                        outputView = null;
+                    }
+                }
+            }
+
+            if (outputView != null) {
+                sendToResults(outputView, count, pair.getRight());
+            }
+
+            latch.countDown();
+        } catch (Throwable e) {
+            this.exception.set(e);
+        }
+    }
+
+    private SimpleCollectingOutputView newOutputView() throws 
InterruptedException {
+        MemorySegment page = idlePages.take();
+        return new SimpleCollectingOutputView(
+                new ArraySegmentPool(Collections.singletonList(page)), 
page.size());
+    }
+
+    private void sendToResults(SimpleCollectingOutputView outputView, int 
count, E extraMessage) {
+        results.add(iterator(outputView.getCurrentSegment(), count, 
extraMessage));
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.executorService.shutdownNow();
+    }
+
+    private ParallelBatch<T, E> iterator(MemorySegment page, int numRecords, E 
extraMessage) {
+        RandomAccessInputView inputView =
+                new RandomAccessInputView(
+                        new ArrayList<>(Collections.singletonList(page)), 
page.size());
+        return new ParallelBatch<T, E>() {
+
+            int numReturn = 0;
+
+            @Nullable
+            @Override
+            public T next() throws IOException {
+                if (numReturn >= numRecords) {
+                    return null;
+                }
+
+                numReturn++;
+                return serializer.deserialize(inputView);
+            }
+
+            @Override
+            public void releaseBatch() {
+                idlePages.add(page);
+            }
+
+            @Override
+            public E extraMesage() {
+                return extraMessage;
+            }
+        };
+    }
+
+    /** A batch provides next and extra message. */
+    public interface ParallelBatch<T, E> {
+
+        @Nullable
+        T next() throws IOException;
+
+        void releaseBatch();
+
+        E extraMesage();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index 502067566..a18ec4249 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.utils;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -56,6 +57,10 @@ public class RowDataToObjectArrayConverter implements 
Serializable {
         return fieldGetters.length;
     }
 
+    public GenericRow toGenericRow(InternalRow rowData) {
+        return GenericRow.of(convert(rowData));
+    }
+
     public Object[] convert(InternalRow rowData) {
         Object[] result = new Object[fieldGetters.length];
         for (int i = 0; i < fieldGetters.length; i++) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 6ee72b0b4..bd938c028 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -33,6 +33,7 @@ import org.apache.paimon.types.VarCharType;
 
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.List;
@@ -45,6 +46,16 @@ import static 
org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING;
 /** Type related helper functions. */
 public class TypeUtils {
 
+    public static RowType concat(RowType left, RowType right) {
+        RowType.Builder builder = RowType.builder();
+        List<DataField> fields = new ArrayList<>(left.getFields());
+        fields.addAll(right.getFields());
+        fields.forEach(
+                dataField ->
+                        builder.field(dataField.name(), dataField.type(), 
dataField.description()));
+        return builder.build();
+    }
+
     public static RowType project(RowType inputType, int[] mapping) {
         List<DataField> fields = inputType.getFields();
         return new RowType(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
new file mode 100644
index 000000000..01bb81f11
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.serializer.IntSerializer;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ParallelExecution}. */
+public class ParallelExecutionTest {
+
+    @Test
+    public void testNormal() {
+        Supplier<Pair<RecordReader<Integer>, Integer>> supplier1 =
+                () ->
+                        Pair.of(
+                                create(
+                                        new LinkedList<>(
+                                                Arrays.asList(
+                                                        Arrays.asList(1, 5, 6),
+                                                        Arrays.asList(2, 7)))),
+                                1);
+        Supplier<Pair<RecordReader<Integer>, Integer>> supplier2 =
+                () ->
+                        Pair.of(
+                                create(
+                                        new LinkedList<>(
+                                                Arrays.asList(
+                                                        Arrays.asList(33, 55),
+                                                        Arrays.asList(22, 
77)))),
+                                2);
+        Supplier<Pair<RecordReader<Integer>, Integer>> supplier3 =
+                () ->
+                        Pair.of(
+                                create(
+                                        new LinkedList<>(
+                                                Arrays.asList(
+                                                        Arrays.asList(333, 
555),
+                                                        Arrays.asList(222, 
777)))),
+                                3);
+
+        ParallelExecution<Integer, Integer> execution =
+                new ParallelExecution<>(
+                        new IntSerializer(),
+                        1024,
+                        2,
+                        Arrays.asList(supplier1, supplier2, supplier3));
+        List<Pair<Integer, Integer>> result = collect(execution);
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Pair.of(1, 1),
+                        Pair.of(5, 1),
+                        Pair.of(6, 1),
+                        Pair.of(2, 1),
+                        Pair.of(7, 1),
+                        Pair.of(33, 2),
+                        Pair.of(55, 2),
+                        Pair.of(22, 2),
+                        Pair.of(77, 2),
+                        Pair.of(333, 3),
+                        Pair.of(555, 3),
+                        Pair.of(222, 3),
+                        Pair.of(777, 3));
+    }
+
+    @Test
+    public void testException() {
+        String message = "Test Exception";
+
+        Supplier<Pair<RecordReader<Integer>, Integer>> supplier1 =
+                () ->
+                        Pair.of(
+                                create(
+                                        new LinkedList<>(
+                                                Arrays.asList(
+                                                        Arrays.asList(1, 5, 6),
+                                                        Arrays.asList(2, 7)))),
+                                1);
+        RecordReader<Integer> exReader =
+                new RecordReader<Integer>() {
+                    @Nullable
+                    @Override
+                    public RecordIterator<Integer> readBatch() {
+                        throw new RuntimeException(message);
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+
+        ParallelExecution<Integer, Integer> execution =
+                new ParallelExecution<>(
+                        new IntSerializer(),
+                        1024,
+                        2,
+                        Arrays.asList(supplier1, supplier1, () -> 
Pair.of(exReader, 2)));
+        assertThatThrownBy(() -> 
collect(execution)).hasMessageContaining(message);
+    }
+
+    private RecordReader<Integer> create(Queue<List<Integer>> queue) {
+        return new RecordReader<Integer>() {
+            @Nullable
+            @Override
+            public RecordIterator<Integer> readBatch() {
+                List<Integer> values = queue.poll();
+                if (values == null) {
+                    return null;
+                }
+                Queue<Integer> vQueue = new LinkedList<>(values);
+                return new RecordIterator<Integer>() {
+                    @Nullable
+                    @Override
+                    public Integer next() {
+                        return vQueue.poll();
+                    }
+
+                    @Override
+                    public void releaseBatch() {}
+                };
+            }
+
+            @Override
+            public void close() {}
+        };
+    }
+
+    private List<Pair<Integer, Integer>> collect(ParallelExecution<Integer, 
Integer> execution) {
+        List<Pair<Integer, Integer>> result = new ArrayList<>();
+        while (true) {
+            try {
+                ParallelBatch<Integer, Integer> batch = execution.take();
+                if (batch == null) {
+                    break;
+                }
+
+                while (true) {
+                    Integer record = batch.next();
+                    if (record == null) {
+                        batch.releaseBatch();
+                        break;
+                    }
+
+                    result.add(Pair.of(record, batch.extraMesage()));
+                }
+            } catch (InterruptedException | IOException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
index db1a3318d..392b29622 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
@@ -34,14 +35,21 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ParallelExecution;
+import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.TypeUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -65,11 +73,8 @@ public class IndexBootstrap implements Serializable {
             throws IOException {
         RowType rowType = table.rowType();
         List<String> fieldNames = rowType.getFieldNames();
-        List<String> keyPartFields =
-                Stream.concat(table.primaryKeys().stream(), 
table.partitionKeys().stream())
-                        .collect(Collectors.toList());
-        int[] projection =
-                keyPartFields.stream()
+        int[] keyProjection =
+                table.primaryKeys().stream()
                         .map(fieldNames::indexOf)
                         .mapToInt(Integer::intValue)
                         .toArray();
@@ -78,7 +83,7 @@ public class IndexBootstrap implements Serializable {
         ReadBuilder readBuilder =
                 table.copy(Collections.singletonMap(SCAN_MODE.key(), 
LATEST.toString()))
                         .newReadBuilder()
-                        .withProjection(projection);
+                        .withProjection(keyProjection);
 
         AbstractInnerTableScan tableScan = (AbstractInnerTableScan) 
readBuilder.newScan();
         List<Split> splits =
@@ -87,7 +92,8 @@ public class IndexBootstrap implements Serializable {
                         .plan()
                         .splits();
 
-        Duration indexTtl = 
CoreOptions.fromMap(table.options()).crossPartitionUpsertIndexTtl();
+        CoreOptions options = CoreOptions.fromMap(table.options());
+        Duration indexTtl = options.crossPartitionUpsertIndexTtl();
         if (indexTtl != null) {
             long indexTtlMillis = indexTtl.toMillis();
             long currentTime = System.currentTimeMillis();
@@ -97,13 +103,56 @@ public class IndexBootstrap implements Serializable {
                             .collect(Collectors.toList());
         }
 
+        List<Supplier<Pair<RecordReader<InternalRow>, GenericRow>>> suppliers 
= new ArrayList<>();
+        RowDataToObjectArrayConverter partBucketConverter =
+                new RowDataToObjectArrayConverter(
+                        TypeUtils.concat(
+                                TypeUtils.project(rowType, 
table.partitionKeys()),
+                                RowType.of(DataTypes.INT())));
         for (Split split : splits) {
-            try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(split)) {
-                int bucket = ((DataSplit) split).bucket();
-                GenericRow bucketRow = GenericRow.of(bucket);
-                JoinedRow joinedRow = new JoinedRow();
-                reader.transform(row -> joinedRow.replace(row, bucketRow))
-                        .forEachRemaining(collector);
+            suppliers.add(
+                    () -> {
+                        try {
+                            RecordReader<InternalRow> reader =
+                                    readBuilder.newRead().createReader(split);
+                            DataSplit dataSplit = ((DataSplit) split);
+                            int bucket = dataSplit.bucket();
+                            GenericRow partAndBucket =
+                                    partBucketConverter.toGenericRow(
+                                            new JoinedRow(
+                                                    dataSplit.partition(), 
GenericRow.of(bucket)));
+                            return Pair.of(reader, partAndBucket);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    });
+        }
+        ParallelExecution<InternalRow, GenericRow> execution =
+                new ParallelExecution<>(
+                        InternalSerializers.create(TypeUtils.project(rowType, 
keyProjection)),
+                        options.pageSize(),
+                        options.crossPartitionUpsertBootstrapParallelism(),
+                        suppliers);
+        JoinedRow joinedRow = new JoinedRow();
+        while (true) {
+            try {
+                ParallelBatch<InternalRow, GenericRow> batch = 
execution.take();
+                if (batch == null) {
+                    break;
+                }
+
+                while (true) {
+                    InternalRow row = batch.next();
+                    if (row == null) {
+                        batch.releaseBatch();
+                        break;
+                    }
+
+                    collector.accept(joinedRow.replace(row, 
batch.extraMesage()));
+                }
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
             }
         }
     }

Reply via email to