This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1d5c4ebd4c [flink] Enable BTreeIndexTopoBuilder to build multiple
columns index in one job (#7413)
1d5c4ebd4c is described below
commit 1d5c4ebd4cae28f821c476d6d7cd9933833c779a
Author: YeJunHao <[email protected]>
AuthorDate: Thu Mar 12 16:25:54 2026 +0800
[flink] Enable BTreeIndexTopoBuilder to build multiple columns index in one
job (#7413)
---
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 17 +--
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 9 +-
.../btree/BTreeGlobalIndexBuilderTest.java | 1 -
.../paimon/table/BtreeGlobalIndexTableTest.java | 5 +-
.../paimon/flink/btree/BTreeIndexTopoBuilder.java | 163 ++++++++++++---------
.../globalindex/btree/BTreeIndexTopoBuilder.java | 4 +-
6 files changed, 100 insertions(+), 99 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index 4c051741b2..e9ca904bc8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -48,7 +48,6 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.MutableObjectIteratorAdapter;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RangeHelper;
@@ -76,15 +75,14 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
public class BTreeGlobalIndexBuilder implements Serializable {
private static final long serialVersionUID = 1L;
-
private static final double FLOATING = 1.2;
+ private static final String INDEX_TYPE = "btree";
private final FileStoreTable table;
private final RowType rowType;
private final Options options;
private final long recordsPerRange;
- private String indexType;
private DataField indexField;
// readRowType is composed by partition fields, indexed field and _ROW_ID
field
@@ -100,15 +98,6 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
(long)
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
}
- public BTreeGlobalIndexBuilder withIndexType(String indexType) {
- this.indexType = indexType;
- Preconditions.checkArgument(
- BTreeGlobalIndexerFactory.IDENTIFIER.equals(indexType),
- "BTreeGlobalInderBuilder only supports %s index type",
- BTreeGlobalIndexerFactory.IDENTIFIER);
- return this;
- }
-
public BTreeGlobalIndexBuilder withIndexField(String indexField) {
checkArgument(
rowType.containsField(indexField),
@@ -216,7 +205,7 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
public GlobalIndexParallelWriter createWriter() throws IOException {
GlobalIndexParallelWriter currentWriter;
- GlobalIndexWriter indexWriter = createIndexWriter(table, indexType,
indexField, options);
+ GlobalIndexWriter indexWriter = createIndexWriter(table, INDEX_TYPE,
indexField, options);
if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
throw new RuntimeException(
"Unexpected implementation, the index writer of BTree
should be an instance of GlobalIndexParallelWriter, but found: "
@@ -236,7 +225,7 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
table.coreOptions(),
rowRange,
indexField.id(),
- indexType,
+ INDEX_TYPE,
resultEntries);
DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
return new CommitMessageImpl(
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index a9bcfc9bbf..6e07a1d9f8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -512,8 +512,7 @@ public class JavaPyE2ETest {
}
// build index
- BTreeGlobalIndexBuilder builder =
- new
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+ BTreeGlobalIndexBuilder builder = new
BTreeGlobalIndexBuilder(table).withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(
builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
@@ -581,8 +580,7 @@ public class JavaPyE2ETest {
}
// build index
- BTreeGlobalIndexBuilder builder =
- new
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+ BTreeGlobalIndexBuilder builder = new
BTreeGlobalIndexBuilder(table).withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(
builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
@@ -652,8 +650,7 @@ public class JavaPyE2ETest {
}
// build index
- BTreeGlobalIndexBuilder builder =
- new
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+ BTreeGlobalIndexBuilder builder = new
BTreeGlobalIndexBuilder(table).withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(
builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
index bd748edb53..fb1fbcb4ff 100644
---
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -106,7 +106,6 @@ public class BTreeGlobalIndexBuilderTest extends
TableTestBase {
BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
builder.withIndexField("f0");
- builder.withIndexType("btree");
builder.withPartitionPredicate(partitionPredicate);
List<DataSplit> dataSplits = builder.scan();
List<CommitMessage> commitMessages = new ArrayList<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index 9b1ace20a0..145997206a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
-import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.sink.BatchTableCommit;
@@ -167,9 +166,7 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
private void createIndex(String fieldName) throws Exception {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
BTreeGlobalIndexBuilder builder =
- new BTreeGlobalIndexBuilder(table)
- .withIndexType(BTreeGlobalIndexerFactory.IDENTIFIER)
- .withIndexField(fieldName);
+ new BTreeGlobalIndexBuilder(table).withIndexField(fieldName);
List<DataSplit> dataSplits = builder.scan();
List<CommitMessage> commitMessages = new ArrayList<>();
for (DataSplit dataSplit : dataSplits) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index f69b0d704c..0a8cda3baf 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -60,9 +60,11 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.function.Supplier;
import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
@@ -72,83 +74,86 @@ public class BTreeIndexTopoBuilder {
public static void buildIndex(
StreamExecutionEnvironment env,
+ Supplier<BTreeGlobalIndexBuilder> indexBuilderSupplier,
FileStoreTable table,
- String indexColumn,
+ List<String> indexColumns,
PartitionPredicate partitionPredicate,
Options userOptions)
throws Exception {
- // 1. Create BTree index builder and scan splits
- BTreeGlobalIndexBuilder indexBuilder =
- new BTreeGlobalIndexBuilder(table)
- .withIndexType("btree")
- .withIndexField(indexColumn);
- if (partitionPredicate != null) {
- indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
- }
-
- List<DataSplit> splits =
splitByContiguousRowRange(indexBuilder.scan());
- if (splits.isEmpty()) {
- return;
- }
- Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
- groupSplitsByRange(splits);
- if (partitionRangeSplits.isEmpty()) {
- return;
- }
-
- // 2. Select necessary columns (index field + ROW_ID)
- List<String> selectedColumns = new ArrayList<>();
- selectedColumns.add(indexColumn);
-
- RowType readType =
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
- int indexFieldPos = readType.getFieldIndex(indexColumn);
- int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
- DataType indexFieldType = readType.getTypeAt(indexFieldPos);
+ DataStream<Committable> allCommitMessages = null;
+ for (String indexColumn : indexColumns) {
+ BTreeGlobalIndexBuilder indexBuilder =
+ indexBuilderSupplier.get().withIndexField(indexColumn);
+ if (partitionPredicate != null) {
+ indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
+ }
- // 3. Calculate maximum parallelism bound
- long recordsPerRange =
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
- int maxParallelism =
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+ List<DataSplit> splits =
splitByContiguousRowRange(indexBuilder.scan());
+ if (splits.isEmpty()) {
+ return;
+ }
+ Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
+ groupSplitsByRange(splits);
+ if (partitionRangeSplits.isEmpty()) {
+ return;
+ }
- // 4. Build one topology per contiguous row range
- CoreOptions coreOptions = table.coreOptions();
- ReadBuilder readBuilder =
table.newReadBuilder().withReadType(readType);
- List<String> sortColumns = new ArrayList<>();
- sortColumns.add(indexColumn);
- DataStream<Committable> allCommitMessages = null;
- int partitionFieldSize = table.partitionKeys().size();
- BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionFieldSize);
- for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
- partitionRangeSplits.entrySet()) {
- BinaryRow partition = partitionEntry.getKey();
- for (Map.Entry<Range, List<DataSplit>> entry :
partitionEntry.getValue().entrySet()) {
- Range range = entry.getKey();
- List<DataSplit> rangeSplits = entry.getValue();
- if (rangeSplits.isEmpty()) {
- continue;
+ // 2. Select necessary columns (index field + ROW_ID)
+ List<String> selectedColumns = new ArrayList<>();
+ selectedColumns.add(indexColumn);
+
+ RowType readType =
+
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
+ int indexFieldPos = readType.getFieldIndex(indexColumn);
+ int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
+ DataType indexFieldType = readType.getTypeAt(indexFieldPos);
+
+ // 3. Calculate maximum parallelism bound
+ long recordsPerRange =
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
+ int maxParallelism =
+
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+
+ // 4. Build one topology per contiguous row range
+ CoreOptions coreOptions = table.coreOptions();
+ ReadBuilder readBuilder =
table.newReadBuilder().withReadType(readType);
+ List<String> sortColumns = new ArrayList<>();
+ sortColumns.add(indexColumn);
+ int partitionFieldSize = table.partitionKeys().size();
+ BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionFieldSize);
+ for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>>
partitionEntry :
+ partitionRangeSplits.entrySet()) {
+ BinaryRow partition = partitionEntry.getKey();
+ for (Map.Entry<Range, List<DataSplit>> entry :
+ partitionEntry.getValue().entrySet()) {
+ Range range = entry.getKey();
+ List<DataSplit> rangeSplits = entry.getValue();
+ if (rangeSplits.isEmpty()) {
+ continue;
+ }
+
+ DataStream<Committable> commitMessages =
+ executeForPartitionRange(
+ env,
+ range,
+ rangeSplits,
+ readBuilder,
+ indexBuilder,
+ partitionFieldSize,
+
binaryRowSerializer.serializeToBytes(partition),
+ indexFieldPos,
+ rowIdPos,
+ indexFieldType,
+ sortColumns,
+ coreOptions,
+ readType,
+ recordsPerRange,
+ maxParallelism);
+
+ allCommitMessages =
+ allCommitMessages == null
+ ? commitMessages
+ : allCommitMessages.union(commitMessages);
}
-
- DataStream<Committable> commitMessages =
- executeForPartitionRange(
- env,
- range,
- rangeSplits,
- readBuilder,
- indexBuilder,
- partitionFieldSize,
-
binaryRowSerializer.serializeToBytes(partition),
- indexFieldPos,
- rowIdPos,
- indexFieldType,
- sortColumns,
- coreOptions,
- readType,
- recordsPerRange,
- maxParallelism);
-
- allCommitMessages =
- allCommitMessages == null
- ? commitMessages
- : allCommitMessages.union(commitMessages);
}
}
if (allCommitMessages != null) {
@@ -158,7 +163,23 @@ public class BTreeIndexTopoBuilder {
env.execute("Create btree global index for table: " + table.name());
}
- private static DataStream<Committable> executeForPartitionRange(
+ public static void buildIndex(
+ StreamExecutionEnvironment env,
+ FileStoreTable table,
+ String indexColumn,
+ PartitionPredicate partitionPredicate,
+ Options userOptions)
+ throws Exception {
+ buildIndex(
+ env,
+ () -> new BTreeGlobalIndexBuilder(table),
+ table,
+ Collections.singletonList(indexColumn),
+ partitionPredicate,
+ userOptions);
+ }
+
+ protected static DataStream<Committable> executeForPartitionRange(
StreamExecutionEnvironment env,
Range range,
List<DataSplit> rangeSplits,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index 644fa81cdd..cf9402d0d5 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -78,9 +78,7 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
throws IOException {
// 1. read the whole dataset of target partitions
BTreeGlobalIndexBuilder indexBuilder =
- new BTreeGlobalIndexBuilder(table)
- .withIndexType(indexType)
- .withIndexField(indexField.name());
+ new
BTreeGlobalIndexBuilder(table).withIndexField(indexField.name());
if (partitionPredicate != null) {
indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
}