This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d5c4ebd4c [flink] Enable BTreeIndexTopoBuilder to build multiple 
columns index in one job (#7413)
1d5c4ebd4c is described below

commit 1d5c4ebd4cae28f821c476d6d7cd9933833c779a
Author: YeJunHao <[email protected]>
AuthorDate: Thu Mar 12 16:25:54 2026 +0800

    [flink] Enable BTreeIndexTopoBuilder to build multiple columns index in one 
job (#7413)
---
 .../globalindex/btree/BTreeGlobalIndexBuilder.java |  17 +--
 .../test/java/org/apache/paimon/JavaPyE2ETest.java |   9 +-
 .../btree/BTreeGlobalIndexBuilderTest.java         |   1 -
 .../paimon/table/BtreeGlobalIndexTableTest.java    |   5 +-
 .../paimon/flink/btree/BTreeIndexTopoBuilder.java  | 163 ++++++++++++---------
 .../globalindex/btree/BTreeIndexTopoBuilder.java   |   4 +-
 6 files changed, 100 insertions(+), 99 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index 4c051741b2..e9ca904bc8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -48,7 +48,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.MutableObjectIteratorAdapter;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RangeHelper;
 
@@ -76,15 +75,14 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class BTreeGlobalIndexBuilder implements Serializable {
 
     private static final long serialVersionUID = 1L;
-
     private static final double FLOATING = 1.2;
+    private static final String INDEX_TYPE = "btree";
 
     private final FileStoreTable table;
     private final RowType rowType;
     private final Options options;
     private final long recordsPerRange;
 
-    private String indexType;
     private DataField indexField;
 
     // readRowType is composed by partition fields, indexed field and _ROW_ID 
field
@@ -100,15 +98,6 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                 (long) 
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
     }
 
-    public BTreeGlobalIndexBuilder withIndexType(String indexType) {
-        this.indexType = indexType;
-        Preconditions.checkArgument(
-                BTreeGlobalIndexerFactory.IDENTIFIER.equals(indexType),
-                "BTreeGlobalInderBuilder only supports %s index type",
-                BTreeGlobalIndexerFactory.IDENTIFIER);
-        return this;
-    }
-
     public BTreeGlobalIndexBuilder withIndexField(String indexField) {
         checkArgument(
                 rowType.containsField(indexField),
@@ -216,7 +205,7 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
 
     public GlobalIndexParallelWriter createWriter() throws IOException {
         GlobalIndexParallelWriter currentWriter;
-        GlobalIndexWriter indexWriter = createIndexWriter(table, indexType, 
indexField, options);
+        GlobalIndexWriter indexWriter = createIndexWriter(table, INDEX_TYPE, 
indexField, options);
         if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
             throw new RuntimeException(
                     "Unexpected implementation, the index writer of BTree 
should be an instance of GlobalIndexParallelWriter, but found: "
@@ -236,7 +225,7 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                         table.coreOptions(),
                         rowRange,
                         indexField.id(),
-                        indexType,
+                        INDEX_TYPE,
                         resultEntries);
         DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
         return new CommitMessageImpl(
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index a9bcfc9bbf..6e07a1d9f8 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -512,8 +512,7 @@ public class JavaPyE2ETest {
         }
 
         // build index
-        BTreeGlobalIndexBuilder builder =
-                new 
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+        BTreeGlobalIndexBuilder builder = new 
BTreeGlobalIndexBuilder(table).withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
             commit.commit(
                     builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
@@ -581,8 +580,7 @@ public class JavaPyE2ETest {
         }
 
         // build index
-        BTreeGlobalIndexBuilder builder =
-                new 
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+        BTreeGlobalIndexBuilder builder = new 
BTreeGlobalIndexBuilder(table).withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
             commit.commit(
                     builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
@@ -652,8 +650,7 @@ public class JavaPyE2ETest {
         }
 
         // build index
-        BTreeGlobalIndexBuilder builder =
-                new 
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+        BTreeGlobalIndexBuilder builder = new 
BTreeGlobalIndexBuilder(table).withIndexField("k");
         try (BatchTableCommit commit = writeBuilder.newCommit()) {
             commit.commit(
                     builder.build(builder.scan().get(0), 
IOManager.create(warehouse.toString())));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
index bd748edb53..fb1fbcb4ff 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -106,7 +106,6 @@ public class BTreeGlobalIndexBuilderTest extends 
TableTestBase {
 
         BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
         builder.withIndexField("f0");
-        builder.withIndexType("btree");
         builder.withPartitionPredicate(partitionPredicate);
         List<DataSplit> dataSplits = builder.scan();
         List<CommitMessage> commitMessages = new ArrayList<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index 9b1ace20a0..145997206a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -24,7 +24,6 @@ import org.apache.paimon.globalindex.GlobalIndexResult;
 import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
 import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
 import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
-import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -167,9 +166,7 @@ public class BtreeGlobalIndexTableTest extends 
DataEvolutionTestBase {
     private void createIndex(String fieldName) throws Exception {
         FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
         BTreeGlobalIndexBuilder builder =
-                new BTreeGlobalIndexBuilder(table)
-                        .withIndexType(BTreeGlobalIndexerFactory.IDENTIFIER)
-                        .withIndexField(fieldName);
+                new BTreeGlobalIndexBuilder(table).withIndexField(fieldName);
         List<DataSplit> dataSplits = builder.scan();
         List<CommitMessage> commitMessages = new ArrayList<>();
         for (DataSplit dataSplit : dataSplits) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index f69b0d704c..0a8cda3baf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -60,9 +60,11 @@ import 
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.function.Supplier;
 
 import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
 import static 
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
@@ -72,83 +74,86 @@ public class BTreeIndexTopoBuilder {
 
     public static void buildIndex(
             StreamExecutionEnvironment env,
+            Supplier<BTreeGlobalIndexBuilder> indexBuilderSupplier,
             FileStoreTable table,
-            String indexColumn,
+            List<String> indexColumns,
             PartitionPredicate partitionPredicate,
             Options userOptions)
             throws Exception {
-        // 1. Create BTree index builder and scan splits
-        BTreeGlobalIndexBuilder indexBuilder =
-                new BTreeGlobalIndexBuilder(table)
-                        .withIndexType("btree")
-                        .withIndexField(indexColumn);
-        if (partitionPredicate != null) {
-            indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
-        }
-
-        List<DataSplit> splits = 
splitByContiguousRowRange(indexBuilder.scan());
-        if (splits.isEmpty()) {
-            return;
-        }
-        Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
-                groupSplitsByRange(splits);
-        if (partitionRangeSplits.isEmpty()) {
-            return;
-        }
-
-        // 2. Select necessary columns (index field + ROW_ID)
-        List<String> selectedColumns = new ArrayList<>();
-        selectedColumns.add(indexColumn);
-
-        RowType readType = 
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
-        int indexFieldPos = readType.getFieldIndex(indexColumn);
-        int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
-        DataType indexFieldType = readType.getTypeAt(indexFieldPos);
+        DataStream<Committable> allCommitMessages = null;
+        for (String indexColumn : indexColumns) {
+            BTreeGlobalIndexBuilder indexBuilder =
+                    indexBuilderSupplier.get().withIndexField(indexColumn);
+            if (partitionPredicate != null) {
+                indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
+            }
 
-        // 3. Calculate maximum parallelism bound
-        long recordsPerRange = 
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
-        int maxParallelism = 
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+            List<DataSplit> splits = 
splitByContiguousRowRange(indexBuilder.scan());
+            if (splits.isEmpty()) {
+                return;
+            }
+            Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
+                    groupSplitsByRange(splits);
+            if (partitionRangeSplits.isEmpty()) {
+                return;
+            }
 
-        // 4. Build one topology per contiguous row range
-        CoreOptions coreOptions = table.coreOptions();
-        ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(readType);
-        List<String> sortColumns = new ArrayList<>();
-        sortColumns.add(indexColumn);
-        DataStream<Committable> allCommitMessages = null;
-        int partitionFieldSize = table.partitionKeys().size();
-        BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionFieldSize);
-        for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
-                partitionRangeSplits.entrySet()) {
-            BinaryRow partition = partitionEntry.getKey();
-            for (Map.Entry<Range, List<DataSplit>> entry : 
partitionEntry.getValue().entrySet()) {
-                Range range = entry.getKey();
-                List<DataSplit> rangeSplits = entry.getValue();
-                if (rangeSplits.isEmpty()) {
-                    continue;
+            // 2. Select necessary columns (index field + ROW_ID)
+            List<String> selectedColumns = new ArrayList<>();
+            selectedColumns.add(indexColumn);
+
+            RowType readType =
+                    
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
+            int indexFieldPos = readType.getFieldIndex(indexColumn);
+            int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
+            DataType indexFieldType = readType.getTypeAt(indexFieldPos);
+
+            // 3. Calculate maximum parallelism bound
+            long recordsPerRange = 
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
+            int maxParallelism =
+                    
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+
+            // 4. Build one topology per contiguous row range
+            CoreOptions coreOptions = table.coreOptions();
+            ReadBuilder readBuilder = 
table.newReadBuilder().withReadType(readType);
+            List<String> sortColumns = new ArrayList<>();
+            sortColumns.add(indexColumn);
+            int partitionFieldSize = table.partitionKeys().size();
+            BinaryRowSerializer binaryRowSerializer = new 
BinaryRowSerializer(partitionFieldSize);
+            for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> 
partitionEntry :
+                    partitionRangeSplits.entrySet()) {
+                BinaryRow partition = partitionEntry.getKey();
+                for (Map.Entry<Range, List<DataSplit>> entry :
+                        partitionEntry.getValue().entrySet()) {
+                    Range range = entry.getKey();
+                    List<DataSplit> rangeSplits = entry.getValue();
+                    if (rangeSplits.isEmpty()) {
+                        continue;
+                    }
+
+                    DataStream<Committable> commitMessages =
+                            executeForPartitionRange(
+                                    env,
+                                    range,
+                                    rangeSplits,
+                                    readBuilder,
+                                    indexBuilder,
+                                    partitionFieldSize,
+                                    
binaryRowSerializer.serializeToBytes(partition),
+                                    indexFieldPos,
+                                    rowIdPos,
+                                    indexFieldType,
+                                    sortColumns,
+                                    coreOptions,
+                                    readType,
+                                    recordsPerRange,
+                                    maxParallelism);
+
+                    allCommitMessages =
+                            allCommitMessages == null
+                                    ? commitMessages
+                                    : allCommitMessages.union(commitMessages);
                 }
-
-                DataStream<Committable> commitMessages =
-                        executeForPartitionRange(
-                                env,
-                                range,
-                                rangeSplits,
-                                readBuilder,
-                                indexBuilder,
-                                partitionFieldSize,
-                                
binaryRowSerializer.serializeToBytes(partition),
-                                indexFieldPos,
-                                rowIdPos,
-                                indexFieldType,
-                                sortColumns,
-                                coreOptions,
-                                readType,
-                                recordsPerRange,
-                                maxParallelism);
-
-                allCommitMessages =
-                        allCommitMessages == null
-                                ? commitMessages
-                                : allCommitMessages.union(commitMessages);
             }
         }
         if (allCommitMessages != null) {
@@ -158,7 +163,23 @@ public class BTreeIndexTopoBuilder {
         env.execute("Create btree global index for table: " + table.name());
     }
 
-    private static DataStream<Committable> executeForPartitionRange(
+    public static void buildIndex(
+            StreamExecutionEnvironment env,
+            FileStoreTable table,
+            String indexColumn,
+            PartitionPredicate partitionPredicate,
+            Options userOptions)
+            throws Exception {
+        buildIndex(
+                env,
+                () -> new BTreeGlobalIndexBuilder(table),
+                table,
+                Collections.singletonList(indexColumn),
+                partitionPredicate,
+                userOptions);
+    }
+
+    protected static DataStream<Committable> executeForPartitionRange(
             StreamExecutionEnvironment env,
             Range range,
             List<DataSplit> rangeSplits,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index 644fa81cdd..cf9402d0d5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -78,9 +78,7 @@ public class BTreeIndexTopoBuilder implements 
GlobalIndexTopologyBuilder {
             throws IOException {
         // 1. read the whole dataset of target partitions
         BTreeGlobalIndexBuilder indexBuilder =
-                new BTreeGlobalIndexBuilder(table)
-                        .withIndexType(indexType)
-                        .withIndexField(indexField.name());
+                new 
BTreeGlobalIndexBuilder(table).withIndexField(indexField.name());
         if (partitionPredicate != null) {
             indexBuilder = 
indexBuilder.withPartitionPredicate(partitionPredicate);
         }

Reply via email to