This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f8402328b4 [core] Optimize Flink BTree index topology (#7852)
f8402328b4 is described below
commit f8402328b486a51b01d801025b26b73483924ad3
Author: YeJunHao <[email protected]>
AuthorDate: Sat May 23 22:48:16 2026 +0800
[core] Optimize Flink BTree index topology (#7852)
- Reworked Flink BTree global index building to use one task-driven
topology for all contiguous row ranges instead of building one topology
per range.
- Added an internal build task id to the sort key so each range keeps
its own row-range metadata while sharing the same Flink
source/read/sort/write chain.
- Added coverage for parallelism calculation, many small ranges, and a
single large range split across multiple writer subtasks.
## Why
When row ranges are highly fragmented, the old implementation creates a
separate Flink topology for each range. That can make the create-index
procedure spend a long time constructing the JobGraph and can produce an
oversized topology.
---
.../paimon/flink/btree/BTreeIndexTopoBuilder.java | 242 +++++++++++++++------
.../paimon/flink/BTreeGlobalIndexITCase.java | 34 +++
.../flink/btree/BTreeIndexTopoBuilderTest.java | 61 ++++++
3 files changed, 271 insertions(+), 66 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index b03a6cdf7a..e3cd198900 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -20,7 +20,9 @@ package org.apache.paimon.flink.btree;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
@@ -49,7 +51,9 @@ import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Range;
@@ -63,8 +67,10 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -77,6 +83,9 @@ import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitB
/** The {@link BTreeIndexTopoBuilder} for BTree index in Flink. */
public class BTreeIndexTopoBuilder {
+ private static final String BUILD_TASK_ID_FIELD = "_BTREE_BUILD_TASK_ID";
+ private static final int BUILD_TASK_ID_FIELD_ID = -1;
+
public static boolean buildIndex(
StreamExecutionEnvironment env,
Supplier<BTreeGlobalIndexBuilder> indexBuilderSupplier,
@@ -114,27 +123,34 @@ public class BTreeIndexTopoBuilder {
List<String> selectedColumns = new ArrayList<>();
selectedColumns.add(indexColumn);
- RowType readType =
+ RowType dataReadType =
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
- int indexFieldPos = readType.getFieldIndex(indexColumn);
- int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
- DataType indexFieldType = readType.getTypeAt(indexFieldPos);
+ String buildTaskIdField = buildTaskIdFieldName(dataReadType);
+ RowType sortReadType = withBuildTaskId(dataReadType,
buildTaskIdField);
+ int taskIdPos = sortReadType.getFieldIndex(buildTaskIdField);
+ int indexFieldPos = sortReadType.getFieldIndex(indexColumn);
+ int rowIdPos =
sortReadType.getFieldIndex(SpecialFields.ROW_ID.name());
+ DataType indexFieldType = sortReadType.getTypeAt(indexFieldPos);
// 3. Calculate maximum parallelism bound
long recordsPerRange =
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
int maxParallelism =
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
- // 4. Build one topology per contiguous row range
+ // 4. Build one topology for all contiguous row ranges
CoreOptions coreOptions = table.coreOptions();
- ReadBuilder readBuilder =
table.newReadBuilder().withReadType(readType);
+ ReadBuilder readBuilder =
table.newReadBuilder().withReadType(dataReadType);
List<String> sortColumns = new ArrayList<>();
+ sortColumns.add(buildTaskIdField);
sortColumns.add(indexColumn);
int partitionFieldSize = table.partitionKeys().size();
BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionFieldSize);
+ List<BTreeBuildTask> buildTasks = new ArrayList<>();
+ List<BTreeSplitTask> splitTasks = new ArrayList<>();
for (Map.Entry<BinaryRow, Map<Range, List<Split>>> partitionEntry :
partitionRangeSplits.entrySet()) {
BinaryRow partition = partitionEntry.getKey();
+ byte[] partitionBytes =
binaryRowSerializer.serializeToBytes(partition);
for (Map.Entry<Range, List<Split>> entry :
partitionEntry.getValue().entrySet()) {
Range range = entry.getKey();
List<Split> rangeSplits = entry.getValue();
@@ -142,27 +158,37 @@ public class BTreeIndexTopoBuilder {
continue;
}
- DataStream<Committable> commitMessages =
- executeForPartitionRange(
- env,
- range,
- rangeSplits,
- readBuilder,
- indexBuilder,
- partitionFieldSize,
-
binaryRowSerializer.serializeToBytes(partition),
- indexFieldPos,
- rowIdPos,
- indexFieldType,
- sortColumns,
- coreOptions,
- readType,
- recordsPerRange,
- maxParallelism);
-
- allStreams.add(commitMessages);
+ int taskId = buildTasks.size();
+ buildTasks.add(new BTreeBuildTask(taskId, range,
partitionBytes));
+ for (Split split : rangeSplits) {
+ splitTasks.add(new BTreeSplitTask(taskId, split));
+ }
}
}
+
+ if (buildTasks.isEmpty()) {
+ return false;
+ }
+
+ DataStream<Committable> commitMessages =
+ executeForBuildTasks(
+ env,
+ buildTasks,
+ splitTasks,
+ readBuilder,
+ indexBuilder,
+ partitionFieldSize,
+ taskIdPos,
+ indexFieldPos,
+ rowIdPos,
+ indexFieldType,
+ sortColumns,
+ coreOptions,
+ sortReadType,
+ recordsPerRange,
+ maxParallelism);
+
+ allStreams.add(commitMessages);
}
if (!allStreams.isEmpty()) {
@SuppressWarnings("unchecked")
@@ -192,14 +218,14 @@ public class BTreeIndexTopoBuilder {
}
}
- protected static DataStream<Committable> executeForPartitionRange(
+ protected static DataStream<Committable> executeForBuildTasks(
StreamExecutionEnvironment env,
- Range range,
- List<Split> rangeSplits,
+ List<BTreeBuildTask> buildTasks,
+ List<BTreeSplitTask> splitTasks,
ReadBuilder readBuilder,
BTreeGlobalIndexBuilder indexBuilder,
int partitionFieldSize,
- byte[] partition,
+ int taskIdPos,
int indexFieldPos,
int rowIdPos,
DataType indexFieldType,
@@ -208,21 +234,18 @@ public class BTreeIndexTopoBuilder {
RowType readType,
long recordsPerRange,
int maxParallelism) {
- int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
- parallelism = Math.min(parallelism, maxParallelism);
+ int parallelism = calculateParallelism(buildTasks, recordsPerRange,
maxParallelism);
- DataStream<Split> sourceStream =
+ DataStream<BTreeSplitTask> sourceStream =
StreamExecutionEnvironmentUtils.fromData(
- env,
- new JavaTypeInfo<>(Split.class),
- rangeSplits.toArray(new Split[0]))
- .name("Global Index Source " + " range=" + range)
+ env, splitTasks, new
JavaTypeInfo<>(BTreeSplitTask.class))
+ .name("Global Index Source")
.setParallelism(1);
DataStream<RowData> rowDataStream =
sourceStream
.transform(
- "Read Data " + range,
+ "Read Data",
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(readType)),
new ReadDataOperator(readBuilder))
.setParallelism(parallelism);
@@ -243,19 +266,51 @@ public class BTreeIndexTopoBuilder {
return sortedStream
.transform(
- "write-btree-index " + range,
+ "write-btree-index",
new CommittableTypeInfo(),
new WriteIndexOperator(
- range,
+ buildTasks,
partitionFieldSize,
- partition,
indexBuilder,
+ taskIdPos,
indexFieldPos,
rowIdPos,
indexFieldType))
.setParallelism(parallelism);
}
+ static int calculateParallelism(
+ List<BTreeBuildTask> buildTasks, long recordsPerRange, int
maxParallelism) {
+ long totalRecords = 0;
+ for (BTreeBuildTask task : buildTasks) {
+ long count = task.rowRange.count();
+ if (Long.MAX_VALUE - totalRecords < count) {
+ totalRecords = Long.MAX_VALUE;
+ } else {
+ totalRecords += count;
+ }
+ }
+
+ long parallelism = Math.max(totalRecords / recordsPerRange, 1);
+ return (int) Math.min(parallelism, maxParallelism);
+ }
+
+ private static String buildTaskIdFieldName(RowType readType) {
+ String fieldName = BUILD_TASK_ID_FIELD;
+ while (readType.containsField(fieldName)) {
+ fieldName = "_" + fieldName;
+ }
+ return fieldName;
+ }
+
+ private static RowType withBuildTaskId(RowType readType, String
buildTaskIdField) {
+ List<DataField> fields = new ArrayList<>();
+ fields.add(
+ new DataField(BUILD_TASK_ID_FIELD_ID, buildTaskIdField,
DataTypes.INT().notNull()));
+ fields.addAll(readType.getFields());
+ return new RowType(readType.isNullable(), fields);
+ }
+
private static void commit(FileStoreTable table, DataStream<Committable>
written) {
OneInputStreamOperatorFactory<Committable, Committable>
committerOperator =
new CommitterOperatorFactory<>(
@@ -276,7 +331,7 @@ public class BTreeIndexTopoBuilder {
private static class ReadDataOperator
extends
org.apache.flink.table.runtime.operators.TableStreamOperator<RowData>
implements
org.apache.flink.streaming.api.operators.OneInputStreamOperator<
- Split, RowData> {
+ BTreeSplitTask, RowData> {
private static final long serialVersionUID = 1L;
@@ -295,43 +350,50 @@ public class BTreeIndexTopoBuilder {
}
@Override
- public void processElement(StreamRecord<Split> element) throws
Exception {
- Split split = element.getValue();
- try (RecordReader<InternalRow> reader =
tableRead.createReader(split)) {
+ public void processElement(StreamRecord<BTreeSplitTask> element)
throws Exception {
+ BTreeSplitTask buildTask = element.getValue();
+ GenericRow taskId = GenericRow.of(buildTask.taskId);
+ try (RecordReader<InternalRow> reader =
tableRead.createReader(buildTask.split)) {
reader.forEachRemaining(
- row -> output.collect(new StreamRecord<>(new
FlinkRowData(row))));
+ row ->
+ output.collect(
+ new StreamRecord<>(
+ new FlinkRowData(new
JoinedRow(taskId, row)))));
}
}
}
private static class WriteIndexOperator extends
BoundedOneInputOperator<RowData, Committable> {
- private final Range rowRange;
- private final byte[] partition;
+ private final List<BTreeBuildTask> buildTasks;
private final int partitionFieldSize;
private final BTreeGlobalIndexBuilder builder;
+ private final int taskIdPos;
private final int indexFieldPos;
private final int rowIdPos;
private final DataType indexFieldType;
private transient long counter;
+ private transient BTreeBuildTask currentTask;
+ private transient BinaryRow currentPartition;
private transient GlobalIndexParallelWriter currentWriter;
private transient List<CommitMessage> commitMessages;
+ private transient Map<Integer, BTreeBuildTask> buildTasksById;
private transient InternalRow.FieldGetter indexFieldGetter;
private transient BinaryRowSerializer binaryRowSerializer;
public WriteIndexOperator(
- Range rowRange,
+ List<BTreeBuildTask> buildTasks,
int partitionFieldSize,
- byte[] partition,
BTreeGlobalIndexBuilder builder,
+ int taskIdPos,
int indexFieldPos,
int rowIdPos,
DataType indexFieldType) {
- this.rowRange = rowRange;
+ this.buildTasks = buildTasks;
this.partitionFieldSize = partitionFieldSize;
- this.partition = partition;
this.builder = builder;
+ this.taskIdPos = taskIdPos;
this.indexFieldPos = indexFieldPos;
this.rowIdPos = rowIdPos;
this.indexFieldType = indexFieldType;
@@ -341,6 +403,10 @@ public class BTreeIndexTopoBuilder {
public void open() throws Exception {
super.open();
commitMessages = new ArrayList<>();
+ buildTasksById = new HashMap<>();
+ for (BTreeBuildTask task : buildTasks) {
+ buildTasksById.put(task.taskId, task);
+ }
indexFieldGetter = InternalRow.createFieldGetter(indexFieldType,
indexFieldPos);
this.binaryRowSerializer = new
BinaryRowSerializer(partitionFieldSize);
}
@@ -348,14 +414,20 @@ public class BTreeIndexTopoBuilder {
@Override
public void processElement(StreamRecord<RowData> element) throws
IOException {
InternalRow row = new FlinkRowWrapper(element.getValue());
+ int taskId = row.getInt(taskIdPos);
+ BTreeBuildTask task = buildTasksById.get(taskId);
+ if (task == null) {
+ throw new IllegalArgumentException("Unknown BTree build task
id: " + taskId);
+ }
+
+ if (currentTask == null || currentTask.taskId != taskId) {
+ flushCurrentWriter();
+ currentTask = task;
+ currentPartition =
binaryRowSerializer.deserializeFromBytes(task.partition);
+ }
+
if (currentWriter != null && counter >= builder.recordsPerRange())
{
- commitMessages.add(
- builder.flushIndex(
- rowRange,
- currentWriter.finish(),
-
binaryRowSerializer.deserializeFromBytes(partition)));
- currentWriter = null;
- counter = 0;
+ flushCurrentWriter();
}
counter++;
@@ -364,19 +436,13 @@ public class BTreeIndexTopoBuilder {
currentWriter = builder.createWriter();
}
- long localRowId = row.getLong(rowIdPos) - rowRange.from;
+ long localRowId = row.getLong(rowIdPos) -
currentTask.rowRange.from;
currentWriter.write(indexFieldGetter.getFieldOrNull(row),
localRowId);
}
@Override
public void endInput() throws IOException {
- if (counter > 0) {
- commitMessages.add(
- builder.flushIndex(
- rowRange,
- currentWriter.finish(),
-
binaryRowSerializer.deserializeFromBytes(partition)));
- }
+ flushCurrentWriter();
for (CommitMessage message : commitMessages) {
output.collect(
new StreamRecord<>(
@@ -384,5 +450,49 @@ public class BTreeIndexTopoBuilder {
}
commitMessages.clear();
}
+
+ private void flushCurrentWriter() throws IOException {
+ if (counter > 0 && currentWriter != null) {
+ commitMessages.add(
+ builder.flushIndex(
+ currentTask.rowRange, currentWriter.finish(),
currentPartition));
+ }
+ currentWriter = null;
+ counter = 0;
+ }
+ }
+
+ /** Metadata for one BTree index build range. */
+ public static class BTreeBuildTask implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private int taskId;
+ private Range rowRange;
+ private byte[] partition;
+
+ public BTreeBuildTask() {}
+
+ BTreeBuildTask(int taskId, Range rowRange, byte[] partition) {
+ this.taskId = taskId;
+ this.rowRange = rowRange;
+ this.partition = partition;
+ }
+ }
+
+ /** Split assigned to one BTree index build task. */
+ public static class BTreeSplitTask implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private int taskId;
+ private Split split;
+
+ public BTreeSplitTask() {}
+
+ BTreeSplitTask(int taskId, Split split) {
+ this.taskId = taskId;
+ this.split = split;
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
index 9aaa39e446..9433034d39 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
@@ -144,6 +144,40 @@ public class BTreeGlobalIndexITCase extends
CatalogITCaseBase {
tableName, indexColumn);
}
+ @Test
+ void testBTreeIndexWithSingleRangeAndParallelWriters() throws
Catalog.TableNotExistException {
+ sql(
+ "CREATE TABLE T_SINGLE_RANGE_PARALLEL (id INT, name STRING)
WITH ("
+ + "'global-index.enabled' = 'true', "
+ + "'row-tracking.enabled' = 'true', "
+ + "'data-evolution.enabled' = 'true'"
+ + ")");
+ String values =
+ IntStream.range(0, 2_000)
+ .mapToObj(i -> String.format("(%s, %s)", i, "'name_" +
i + "'"))
+ .collect(Collectors.joining(","));
+ sql("INSERT INTO T_SINGLE_RANGE_PARALLEL VALUES " + values);
+ sql(
+ "CALL sys.create_global_index(`table` =>
'default.T_SINGLE_RANGE_PARALLEL', "
+ + "index_column => 'id', index_type => 'btree', "
+ + "options => 'btree-index.records-per-range=100;"
+ + "btree-index.build.max-parallelism=4')");
+
+ FileStoreTable table = paimonTable("T_SINGLE_RANGE_PARALLEL");
+ List<IndexFileMeta> btreeEntries =
+ table.store().newIndexFileHandler().scanEntries().stream()
+ .map(IndexManifestEntry::indexFile)
+ .filter(f -> "btree".equals(f.indexType()))
+ .collect(Collectors.toList());
+
+ long totalRowCount =
btreeEntries.stream().mapToLong(IndexFileMeta::rowCount).sum();
+ assertThat(btreeEntries).hasSizeGreaterThan(1);
+ assertThat(totalRowCount).isEqualTo(2_000L);
+
+ assertThat(sql("SELECT * FROM T_SINGLE_RANGE_PARALLEL WHERE id =
1500"))
+ .containsOnly(Row.of(1500, "name_1500"));
+ }
+
@Test
void testBTreeIndexWithManyPartitions() throws
Catalog.TableNotExistException {
int numPartitions = 50;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java
new file mode 100644
index 0000000000..107e0ac048
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilderTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.btree;
+
+import org.apache.paimon.flink.btree.BTreeIndexTopoBuilder.BTreeBuildTask;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BTreeIndexTopoBuilder}. */
+public class BTreeIndexTopoBuilderTest {
+
+ @Test
+ public void testCalculateParallelismByTotalRowsInsteadOfRangeCount() {
+ List<BTreeBuildTask> tasks = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ tasks.add(new BTreeBuildTask(i, new Range(i * 10L, i * 10L + 9),
new byte[0]));
+ }
+
+ assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L,
4096)).isEqualTo(1);
+ }
+
+ @Test
+ public void testCalculateParallelismHonorsMaxParallelism() {
+ List<BTreeBuildTask> tasks = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ tasks.add(new BTreeBuildTask(i, new Range(i * 1000L, i * 1000L +
999), new byte[0]));
+ }
+
+ assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L,
16)).isEqualTo(16);
+ }
+
+ @Test
+ public void testCalculateParallelismKeepsSingleRangeBehavior() {
+ List<BTreeBuildTask> tasks = new ArrayList<>();
+ tasks.add(new BTreeBuildTask(0, new Range(0, 1499), new byte[0]));
+
+ assertThat(BTreeIndexTopoBuilder.calculateParallelism(tasks, 1000L,
16)).isEqualTo(1);
+ }
+}