This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b3e9e906c [core] Introduce
cross-partition-upsert.bootstrap-parallelism (#2152)
b3e9e906c is described below
commit b3e9e906c28a8877cf5b94402af28ca59bead39c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 20 10:52:36 2023 +0800
[core] Introduce cross-partition-upsert.bootstrap-parallelism (#2152)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 11 ++
.../paimon/data/SimpleCollectingOutputView.java | 5 +
...orySegmentSource.java => ArraySegmentPool.java} | 48 +++--
.../apache/paimon/memory/MemorySegmentSource.java | 3 +
.../apache/paimon/utils/ExecutorThreadFactory.java | 0
.../paimon/utils/FatalExitExceptionHandler.java | 0
.../org/apache/paimon/utils/ParallelExecution.java | 202 +++++++++++++++++++++
.../utils/RowDataToObjectArrayConverter.java | 5 +
.../java/org/apache/paimon/utils/TypeUtils.java | 11 ++
.../apache/paimon/utils/ParallelExecutionTest.java | 182 +++++++++++++++++++
.../paimon/flink/sink/index/IndexBootstrap.java | 75 ++++++--
12 files changed, 520 insertions(+), 28 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 3fb153a08..253e6414c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -116,6 +116,12 @@ under the License.
<td>Duration</td>
<td>The discovery interval of continuous reading.</td>
</tr>
+ <tr>
+ <td><h5>cross-partition-upsert.bootstrap-parallelism</h5></td>
+ <td style="word-wrap: break-word;">10</td>
+ <td>Integer</td>
+ <td>The parallelism for bootstrap in a single task for cross
partition upsert.</td>
+ </tr>
<tr>
<td><h5>cross-partition-upsert.index-ttl</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 942fa65b0..84890acbf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -880,6 +880,13 @@ public class CoreOptions implements Serializable {
+ "this can avoid maintaining too many
indexes and lead to worse and worse performance, "
+ "but please note that this may also
cause data duplication.");
+ public static final ConfigOption<Integer>
CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM =
+ key("cross-partition-upsert.bootstrap-parallelism")
+ .intType()
+ .defaultValue(10)
+ .withDescription(
+ "The parallelism for bootstrap in a single task
for cross partition upsert.");
+
public static final ConfigOption<Integer> ZORDER_VAR_LENGTH_CONTRIBUTION =
key("zorder.var-length-contribution")
.intType()
@@ -1327,6 +1334,10 @@ public class CoreOptions implements Serializable {
return options.get(CROSS_PARTITION_UPSERT_INDEX_TTL);
}
+ public int crossPartitionUpsertBootstrapParallelism() {
+ return options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM);
+ }
+
public int varTypeSize() {
return options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
b/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
index 6376365db..894d35d29 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/SimpleCollectingOutputView.java
@@ -24,6 +24,7 @@ import org.apache.paimon.utils.MathUtils;
import java.io.EOFException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -40,6 +41,10 @@ public class SimpleCollectingOutputView extends
AbstractPagedOutputView {
private int segmentNum;
+ public SimpleCollectingOutputView(MemorySegmentSource memSource, int
segmentSize) {
+ this(new ArrayList<>(), memSource, segmentSize);
+ }
+
public SimpleCollectingOutputView(
List<MemorySegment> fullSegmentTarget, MemorySegmentSource
memSource, int segmentSize) {
super(memSource.nextSegment(), segmentSize);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
b/paimon-common/src/main/java/org/apache/paimon/memory/ArraySegmentPool.java
similarity index 51%
copy from
paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
copy to
paimon-common/src/main/java/org/apache/paimon/memory/ArraySegmentPool.java
index 0f9701150..0c589a371 100644
---
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/ArraySegmentPool.java
@@ -18,20 +18,38 @@
package org.apache.paimon.memory;
-import org.apache.paimon.annotation.Public;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
-/**
- * Interface describing entities that can provide memory segments.
- *
- * @since 0.4.0
- */
-@Public
-public interface MemorySegmentSource {
-
- /**
- * Gets the next memory segment. If no more segments are available, it
returns null.
- *
- * @return The next memory segment, or null, if none is available.
- */
- MemorySegment nextSegment();
+/** A {@link MemorySegmentPool} for allocated segments. */
+public class ArraySegmentPool implements MemorySegmentPool {
+
+ private final Queue<MemorySegment> segments;
+ private final int pageSize;
+
+ public ArraySegmentPool(List<MemorySegment> segments) {
+ this.segments = new LinkedList<>(segments);
+ this.pageSize = segments.get(0).size();
+ }
+
+ @Override
+ public int pageSize() {
+ return pageSize;
+ }
+
+ @Override
+ public void returnAll(List<MemorySegment> memory) {
+ segments.addAll(memory);
+ }
+
+ @Override
+ public int freePages() {
+ return segments.size();
+ }
+
+ @Override
+ public MemorySegment nextSegment() {
+ return segments.poll();
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
index 0f9701150..f79a3500b 100644
---
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySegmentSource.java
@@ -20,6 +20,8 @@ package org.apache.paimon.memory;
import org.apache.paimon.annotation.Public;
+import javax.annotation.Nullable;
+
/**
* Interface describing entities that can provide memory segments.
*
@@ -33,5 +35,6 @@ public interface MemorySegmentSource {
*
* @return The next memory segment, or null, if none is available.
*/
+ @Nullable
MemorySegment nextSegment();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
similarity index 100%
rename from
paimon-core/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
rename to
paimon-common/src/main/java/org/apache/paimon/utils/ExecutorThreadFactory.java
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
similarity index 100%
rename from
paimon-core/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
rename to
paimon-common/src/main/java/org/apache/paimon/utils/FatalExitExceptionHandler.java
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
new file mode 100644
index 000000000..ab5866e42
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ParallelExecution.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.RandomAccessInputView;
+import org.apache.paimon.data.SimpleCollectingOutputView;
+import org.apache.paimon.data.serializer.Serializer;
+import org.apache.paimon.memory.ArraySegmentPool;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.reader.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * A class to help parallel execution.
+ *
+ * @param <T> Record Type.
+ * @param <E> Extra message of one {@link RecordReader}.
+ */
+public class ParallelExecution<T, E> implements Closeable {
+
+ private final Serializer<T> serializer;
+ private final BlockingQueue<MemorySegment> idlePages;
+ private final BlockingQueue<ParallelBatch<T, E>> results;
+ private final ExecutorService executorService;
+
+ private final AtomicReference<Throwable> exception;
+
+ private final CountDownLatch latch;
+
+ public ParallelExecution(
+ Serializer<T> serializer,
+ int pageSize,
+ int parallelism,
+ List<Supplier<Pair<RecordReader<T>, E>>> readers) {
+ this.serializer = serializer;
+ int totalPages = parallelism * 2;
+ this.idlePages = new ArrayBlockingQueue<>(totalPages);
+ for (int i = 0; i < totalPages; i++) {
+ idlePages.add(MemorySegment.allocateHeapMemory(pageSize));
+ }
+ this.executorService =
+ new ThreadPoolExecutor(
+ parallelism,
+ parallelism,
+ 1,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new
ExecutorThreadFactory(Thread.currentThread().getName() + "-parallel"));
+ this.results = new LinkedBlockingQueue<>();
+ this.exception = new AtomicReference<>();
+ this.latch = new CountDownLatch(readers.size());
+
+ for (Supplier<Pair<RecordReader<T>, E>> readerSupplier : readers) {
+ Serializer<T> duplicate = this.serializer.duplicate();
+ executorService.submit(() -> asyncRead(readerSupplier, duplicate));
+ }
+ }
+
+ @Nullable
+ public ParallelBatch<T, E> take() throws InterruptedException, IOException
{
+ ParallelBatch<T, E> element;
+ do {
+ if (latch.getCount() == 0 && results.isEmpty()) {
+ return null;
+ }
+
+ element = results.poll(2, TimeUnit.SECONDS);
+
+ if (exception.get() != null) {
+ throw new IOException(exception.get());
+ }
+ } while (element == null);
+ return element;
+ }
+
+ private void asyncRead(
+ Supplier<Pair<RecordReader<T>, E>> readerSupplier, Serializer<T>
serializer) {
+ Pair<RecordReader<T>, E> pair = readerSupplier.get();
+ try (CloseableIterator<T> iterator =
pair.getLeft().toCloseableIterator()) {
+ int count = 0;
+ SimpleCollectingOutputView outputView = null;
+
+ while (iterator.hasNext()) {
+ T next = iterator.next();
+
+ while (true) {
+ if (outputView == null) {
+ outputView = newOutputView();
+ count = 0;
+ }
+
+ try {
+ serializer.serialize(next, outputView);
+ count++;
+ break;
+ } catch (EOFException e) {
+ sendToResults(outputView, count, pair.getRight());
+ outputView = null;
+ }
+ }
+ }
+
+ if (outputView != null) {
+ sendToResults(outputView, count, pair.getRight());
+ }
+
+ latch.countDown();
+ } catch (Throwable e) {
+ this.exception.set(e);
+ }
+ }
+
+ private SimpleCollectingOutputView newOutputView() throws
InterruptedException {
+ MemorySegment page = idlePages.take();
+ return new SimpleCollectingOutputView(
+ new ArraySegmentPool(Collections.singletonList(page)),
page.size());
+ }
+
+ private void sendToResults(SimpleCollectingOutputView outputView, int
count, E extraMessage) {
+ results.add(iterator(outputView.getCurrentSegment(), count,
extraMessage));
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.executorService.shutdownNow();
+ }
+
+ private ParallelBatch<T, E> iterator(MemorySegment page, int numRecords, E
extraMessage) {
+ RandomAccessInputView inputView =
+ new RandomAccessInputView(
+ new ArrayList<>(Collections.singletonList(page)),
page.size());
+ return new ParallelBatch<T, E>() {
+
+ int numReturn = 0;
+
+ @Nullable
+ @Override
+ public T next() throws IOException {
+ if (numReturn >= numRecords) {
+ return null;
+ }
+
+ numReturn++;
+ return serializer.deserialize(inputView);
+ }
+
+ @Override
+ public void releaseBatch() {
+ idlePages.add(page);
+ }
+
+ @Override
+ public E extraMesage() {
+ return extraMessage;
+ }
+ };
+ }
+
+ /** A batch provides next and extra message. */
+ public interface ParallelBatch<T, E> {
+
+ @Nullable
+ T next() throws IOException;
+
+ void releaseBatch();
+
+ E extraMesage();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
index 502067566..a18ec4249 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/RowDataToObjectArrayConverter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.utils;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -56,6 +57,10 @@ public class RowDataToObjectArrayConverter implements
Serializable {
return fieldGetters.length;
}
+ public GenericRow toGenericRow(InternalRow rowData) {
+ return GenericRow.of(convert(rowData));
+ }
+
public Object[] convert(InternalRow rowData) {
Object[] result = new Object[fieldGetters.length];
for (int i = 0; i < fieldGetters.length; i++) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
index 6ee72b0b4..bd938c028 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
@@ -33,6 +33,7 @@ import org.apache.paimon.types.VarCharType;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
@@ -45,6 +46,16 @@ import static
org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING;
/** Type related helper functions. */
public class TypeUtils {
+ public static RowType concat(RowType left, RowType right) {
+ RowType.Builder builder = RowType.builder();
+ List<DataField> fields = new ArrayList<>(left.getFields());
+ fields.addAll(right.getFields());
+ fields.forEach(
+ dataField ->
+ builder.field(dataField.name(), dataField.type(),
dataField.description()));
+ return builder.build();
+ }
+
public static RowType project(RowType inputType, int[] mapping) {
List<DataField> fields = inputType.getFields();
return new RowType(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
new file mode 100644
index 000000000..01bb81f11
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/ParallelExecutionTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.serializer.IntSerializer;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link ParallelExecution}. */
+public class ParallelExecutionTest {
+
+ @Test
+ public void testNormal() {
+ Supplier<Pair<RecordReader<Integer>, Integer>> supplier1 =
+ () ->
+ Pair.of(
+ create(
+ new LinkedList<>(
+ Arrays.asList(
+ Arrays.asList(1, 5, 6),
+ Arrays.asList(2, 7)))),
+ 1);
+ Supplier<Pair<RecordReader<Integer>, Integer>> supplier2 =
+ () ->
+ Pair.of(
+ create(
+ new LinkedList<>(
+ Arrays.asList(
+ Arrays.asList(33, 55),
+ Arrays.asList(22,
77)))),
+ 2);
+ Supplier<Pair<RecordReader<Integer>, Integer>> supplier3 =
+ () ->
+ Pair.of(
+ create(
+ new LinkedList<>(
+ Arrays.asList(
+ Arrays.asList(333,
555),
+ Arrays.asList(222,
777)))),
+ 3);
+
+ ParallelExecution<Integer, Integer> execution =
+ new ParallelExecution<>(
+ new IntSerializer(),
+ 1024,
+ 2,
+ Arrays.asList(supplier1, supplier2, supplier3));
+ List<Pair<Integer, Integer>> result = collect(execution);
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Pair.of(1, 1),
+ Pair.of(5, 1),
+ Pair.of(6, 1),
+ Pair.of(2, 1),
+ Pair.of(7, 1),
+ Pair.of(33, 2),
+ Pair.of(55, 2),
+ Pair.of(22, 2),
+ Pair.of(77, 2),
+ Pair.of(333, 3),
+ Pair.of(555, 3),
+ Pair.of(222, 3),
+ Pair.of(777, 3));
+ }
+
+ @Test
+ public void testException() {
+ String message = "Test Exception";
+
+ Supplier<Pair<RecordReader<Integer>, Integer>> supplier1 =
+ () ->
+ Pair.of(
+ create(
+ new LinkedList<>(
+ Arrays.asList(
+ Arrays.asList(1, 5, 6),
+ Arrays.asList(2, 7)))),
+ 1);
+ RecordReader<Integer> exReader =
+ new RecordReader<Integer>() {
+ @Nullable
+ @Override
+ public RecordIterator<Integer> readBatch() {
+ throw new RuntimeException(message);
+ }
+
+ @Override
+ public void close() {}
+ };
+
+ ParallelExecution<Integer, Integer> execution =
+ new ParallelExecution<>(
+ new IntSerializer(),
+ 1024,
+ 2,
+ Arrays.asList(supplier1, supplier1, () ->
Pair.of(exReader, 2)));
+ assertThatThrownBy(() ->
collect(execution)).hasMessageContaining(message);
+ }
+
+ private RecordReader<Integer> create(Queue<List<Integer>> queue) {
+ return new RecordReader<Integer>() {
+ @Nullable
+ @Override
+ public RecordIterator<Integer> readBatch() {
+ List<Integer> values = queue.poll();
+ if (values == null) {
+ return null;
+ }
+ Queue<Integer> vQueue = new LinkedList<>(values);
+ return new RecordIterator<Integer>() {
+ @Nullable
+ @Override
+ public Integer next() {
+ return vQueue.poll();
+ }
+
+ @Override
+ public void releaseBatch() {}
+ };
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+
+ private List<Pair<Integer, Integer>> collect(ParallelExecution<Integer,
Integer> execution) {
+ List<Pair<Integer, Integer>> result = new ArrayList<>();
+ while (true) {
+ try {
+ ParallelBatch<Integer, Integer> batch = execution.take();
+ if (batch == null) {
+ break;
+ }
+
+ while (true) {
+ Integer record = batch.next();
+ if (record == null) {
+ batch.releaseBatch();
+ break;
+ }
+
+ result.add(Pair.of(record, batch.extraMesage()));
+ }
+ } catch (InterruptedException | IOException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
index db1a3318d..392b29622 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/index/IndexBootstrap.java
@@ -23,6 +23,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
@@ -34,14 +35,21 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ParallelExecution;
+import org.apache.paimon.utils.ParallelExecution.ParallelBatch;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+import org.apache.paimon.utils.TypeUtils;
import java.io.IOException;
import java.io.Serializable;
+import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -65,11 +73,8 @@ public class IndexBootstrap implements Serializable {
throws IOException {
RowType rowType = table.rowType();
List<String> fieldNames = rowType.getFieldNames();
- List<String> keyPartFields =
- Stream.concat(table.primaryKeys().stream(),
table.partitionKeys().stream())
- .collect(Collectors.toList());
- int[] projection =
- keyPartFields.stream()
+ int[] keyProjection =
+ table.primaryKeys().stream()
.map(fieldNames::indexOf)
.mapToInt(Integer::intValue)
.toArray();
@@ -78,7 +83,7 @@ public class IndexBootstrap implements Serializable {
ReadBuilder readBuilder =
table.copy(Collections.singletonMap(SCAN_MODE.key(),
LATEST.toString()))
.newReadBuilder()
- .withProjection(projection);
+ .withProjection(keyProjection);
AbstractInnerTableScan tableScan = (AbstractInnerTableScan)
readBuilder.newScan();
List<Split> splits =
@@ -87,7 +92,8 @@ public class IndexBootstrap implements Serializable {
.plan()
.splits();
- Duration indexTtl =
CoreOptions.fromMap(table.options()).crossPartitionUpsertIndexTtl();
+ CoreOptions options = CoreOptions.fromMap(table.options());
+ Duration indexTtl = options.crossPartitionUpsertIndexTtl();
if (indexTtl != null) {
long indexTtlMillis = indexTtl.toMillis();
long currentTime = System.currentTimeMillis();
@@ -97,13 +103,56 @@ public class IndexBootstrap implements Serializable {
.collect(Collectors.toList());
}
+ List<Supplier<Pair<RecordReader<InternalRow>, GenericRow>>> suppliers
= new ArrayList<>();
+ RowDataToObjectArrayConverter partBucketConverter =
+ new RowDataToObjectArrayConverter(
+ TypeUtils.concat(
+ TypeUtils.project(rowType,
table.partitionKeys()),
+ RowType.of(DataTypes.INT())));
for (Split split : splits) {
- try (RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(split)) {
- int bucket = ((DataSplit) split).bucket();
- GenericRow bucketRow = GenericRow.of(bucket);
- JoinedRow joinedRow = new JoinedRow();
- reader.transform(row -> joinedRow.replace(row, bucketRow))
- .forEachRemaining(collector);
+ suppliers.add(
+ () -> {
+ try {
+ RecordReader<InternalRow> reader =
+ readBuilder.newRead().createReader(split);
+ DataSplit dataSplit = ((DataSplit) split);
+ int bucket = dataSplit.bucket();
+ GenericRow partAndBucket =
+ partBucketConverter.toGenericRow(
+ new JoinedRow(
+ dataSplit.partition(),
GenericRow.of(bucket)));
+ return Pair.of(reader, partAndBucket);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+ ParallelExecution<InternalRow, GenericRow> execution =
+ new ParallelExecution<>(
+ InternalSerializers.create(TypeUtils.project(rowType,
keyProjection)),
+ options.pageSize(),
+ options.crossPartitionUpsertBootstrapParallelism(),
+ suppliers);
+ JoinedRow joinedRow = new JoinedRow();
+ while (true) {
+ try {
+ ParallelBatch<InternalRow, GenericRow> batch =
execution.take();
+ if (batch == null) {
+ break;
+ }
+
+ while (true) {
+ InternalRow row = batch.next();
+ if (row == null) {
+ batch.releaseBatch();
+ break;
+ }
+
+ collector.accept(joinedRow.replace(row,
batch.extraMesage()));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
}
}
}