This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bec9ead373 [core] supports multiple partitions in 
BTreeGlobalIndexBuilder (#7191)
bec9ead373 is described below

commit bec9ead373252737db903256ccffec0e29677b4b
Author: Faiz <[email protected]>
AuthorDate: Tue Feb 3 15:36:59 2026 +0800

    [core] supports multiple partitions in BTreeGlobalIndexBuilder (#7191)
---
 .../globalindex/btree/BTreeGlobalIndexBuilder.java |  22 +-
 .../btree/BTreeGlobalIndexBuilderTest.java         | 241 +++++++++++++++++++++
 2 files changed, 256 insertions(+), 7 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index 9dc9da7e6e..a4b63c5918 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -46,6 +46,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.MutableObjectIteratorAdapter;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Range;
 
 import javax.annotation.Nullable;
@@ -57,6 +58,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
+import java.util.stream.IntStream;
 
 import static java.util.Collections.singletonList;
 import static 
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
@@ -77,6 +79,8 @@ public class BTreeGlobalIndexBuilder implements Serializable {
 
     private String indexType;
     private DataField indexField;
+
+    // readRowType is composed by partition fields, indexed field and _ROW_ID 
field
     private RowType readRowType;
     private RowIdIndexFieldsExtractor extractor;
 
@@ -92,6 +96,10 @@ public class BTreeGlobalIndexBuilder implements Serializable 
{
 
     public BTreeGlobalIndexBuilder withIndexType(String indexType) {
         this.indexType = indexType;
+        Preconditions.checkArgument(
+                BTreeGlobalIndexerFactory.IDENTIFIER.equals(indexType),
+                "BTreeGlobalInderBuilder only supports %s index type",
+                BTreeGlobalIndexerFactory.IDENTIFIER);
         return this;
     }
 
@@ -102,15 +110,14 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                 indexField,
                 table.fullName());
         this.indexField = rowType.getField(indexField);
-        this.readRowType =
-                SpecialFields.rowTypeWithRowId(new 
RowType(singletonList(this.indexField)));
         List<String> readColumns = new ArrayList<>(table.partitionKeys());
-        readColumns.addAll(readRowType.getFieldNames());
+        readColumns.addAll(
+                SpecialFields.rowTypeWithRowId(new 
RowType(singletonList(this.indexField)))
+                        .getFieldNames());
+        this.readRowType = 
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns);
         this.extractor =
                 new RowIdIndexFieldsExtractor(
-                        
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
-                        table.partitionKeys(),
-                        this.indexField.name());
+                        this.readRowType, table.partitionKeys(), 
this.indexField.name());
         return this;
     }
 
@@ -141,7 +148,8 @@ public class BTreeGlobalIndexBuilder implements 
Serializable {
                 BinaryExternalSortBuffer.create(
                         ioManager,
                         readRowType,
-                        new int[] {0},
+                        // sort by <partition, indexed_field>
+                        IntStream.range(0, readRowType.getFieldCount() - 
1).toArray(),
                         options.writeBufferSize(),
                         options.pageSize(),
                         options.localSortMaxNumFileHandles(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
new file mode 100644
index 0000000000..d37ad973da
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Test class for {@link BTreeGlobalIndexBuilder}. */
+public class BTreeGlobalIndexBuilderTest extends TableTestBase {
+
+    private static final long PART_ROW_NUM = 1000L;
+    private static final KeySerializer KEY_SERIALIZER = 
KeySerializer.create(DataTypes.INT());
+    private static final Comparator<Object> COMPARATOR = 
KEY_SERIALIZER.createComparator();
+
+    @Override
+    public Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("dt", DataTypes.STRING());
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        
schemaBuilder.option(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key(), 
"100");
+        schemaBuilder.partitionKeys(Collections.singletonList("dt"));
+        return schemaBuilder.build();
+    }
+
+    private void write() throws Exception {
+        createTableDefault();
+
+        BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+        try (BatchTableWrite write0 = builder.newWrite()) {
+            for (int i = 0; i < PART_ROW_NUM; i++) {
+                write0.write(
+                        GenericRow.of(
+                                BinaryString.fromString("p0"),
+                                i,
+                                BinaryString.fromString("f1_" + i)));
+            }
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write0.prepareCommit());
+        }
+
+        try (BatchTableWrite write1 = builder.newWrite()) {
+            for (int i = 0; i < PART_ROW_NUM; i++) {
+                write1.write(
+                        GenericRow.of(
+                                BinaryString.fromString("p1"),
+                                i,
+                                BinaryString.fromString("f1_" + i)));
+            }
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(write1.prepareCommit());
+        }
+    }
+
+    private void createIndex(PartitionPredicate partitionPredicate) throws 
Exception {
+        FileStoreTable table = getTableDefault();
+
+        BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
+        builder.withIndexField("f0");
+        builder.withIndexType("btree");
+        builder.withPartitionPredicate(partitionPredicate);
+        List<CommitMessage> commitMessages = builder.build(builder.scan(), 
ioManager);
+
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.commit(commitMessages);
+        }
+    }
+
+    @Test
+    public void testCreateIndexForSinglePartition() throws Exception {
+        write();
+
+        FileStoreTable table = getTableDefault();
+        RowType partType = table.rowType().project("dt");
+        Predicate predicate =
+                PartitionPredicate.createPartitionPredicate(
+                        partType, Collections.singletonMap("dt", 
BinaryString.fromString("p0")));
+
+        createIndex(PartitionPredicate.fromPredicate(partType, predicate));
+
+        Map<BinaryRow, List<Pair<String, FileStats>>> metasByParts = 
gatherIndexMetas(table);
+
+        Assertions.assertEquals(1, metasByParts.size());
+
+        metasByParts.forEach(this::assertFilesNonOverlapping);
+    }
+
+    @Test
+    public void testCreateIndexForMultiplePartitions() throws Exception {
+        write();
+
+        createIndex(null);
+
+        FileStoreTable table = getTableDefault();
+
+        Map<BinaryRow, List<Pair<String, FileStats>>> metasByParts = 
gatherIndexMetas(table);
+
+        Assertions.assertEquals(2, metasByParts.size());
+
+        metasByParts.forEach(this::assertFilesNonOverlapping);
+    }
+
+    private Map<BinaryRow, List<Pair<String, FileStats>>> 
gatherIndexMetas(FileStoreTable table) {
+        IndexFileHandler handler = table.store().newIndexFileHandler();
+
+        Snapshot snapshot = table.latestSnapshot().get();
+        List<IndexManifestEntry> entries = handler.scan(snapshot, "btree");
+
+        Map<BinaryRow, List<Pair<String, FileStats>>> metasByParts = new 
HashMap<>();
+        for (IndexManifestEntry entry : entries) {
+            IndexFileMeta indexFileMeta = entry.indexFile();
+            Assertions.assertNotNull(
+                    indexFileMeta.globalIndexMeta(), "Global index meta should 
not be null");
+
+            metasByParts
+                    .computeIfAbsent(entry.partition(), part -> new 
ArrayList<>())
+                    .add(
+                            Pair.of(
+                                    indexFileMeta.fileName(),
+                                    
FileStats.fromIndexFileMeta(indexFileMeta)));
+        }
+
+        return metasByParts;
+    }
+
+    private void assertFilesNonOverlapping(
+            BinaryRow partition, List<Pair<String, FileStats>> metas) {
+        if (metas.isEmpty()) {
+            return;
+        }
+
+        metas.sort((m1, m2) -> COMPARATOR.compare(m1.getValue().firstKey, 
m2.getValue().firstKey));
+        String lastFileName = metas.get(0).getKey();
+        FileStats lastMeta = metas.get(0).getValue();
+        long rowCount = lastMeta.rowCount;
+        for (int i = 1; i < metas.size(); i++) {
+            String fileName = metas.get(i).getKey();
+            FileStats fileMeta = metas.get(i).getValue();
+            rowCount += fileMeta.rowCount;
+
+            Assertions.assertTrue(
+                    COMPARATOR.compare(lastMeta.lastKey, fileMeta.firstKey) <= 
0,
+                    String.format(
+                            "In partition %s, key range [%s:%s] of file %s 
overlaps with adjacent file %s [%s:%s]",
+                            partition.getString(0),
+                            lastMeta.firstKey,
+                            lastMeta.lastKey,
+                            lastFileName,
+                            fileName,
+                            fileMeta.firstKey,
+                            fileMeta.lastKey));
+
+            lastFileName = fileName;
+            lastMeta = fileMeta;
+        }
+
+        Assertions.assertEquals(
+                PART_ROW_NUM,
+                rowCount,
+                String.format(
+                        "In partition %s, total row count of all btree index 
files not equals to original data row count.",
+                        partition.getString(0)));
+    }
+
+    static Object deserialize(byte[] bytes) {
+        return KEY_SERIALIZER.deserialize(MemorySlice.wrap(bytes));
+    }
+
+    /** File Stats class for assertion. */
+    private static class FileStats {
+
+        final long rowCount;
+        final Object firstKey;
+        final Object lastKey;
+
+        FileStats(long rowCount, Object firstKey, Object lastKey) {
+            this.rowCount = rowCount;
+            this.firstKey = firstKey;
+            this.lastKey = lastKey;
+        }
+
+        static FileStats fromIndexFileMeta(IndexFileMeta meta) {
+            Assertions.assertNotNull(meta.globalIndexMeta());
+            GlobalIndexMeta globalIndexMeta = meta.globalIndexMeta();
+            BTreeIndexMeta btreeMeta = 
BTreeIndexMeta.deserialize(globalIndexMeta.indexMeta());
+
+            return new FileStats(
+                    meta.rowCount(),
+                    deserialize(btreeMeta.getFirstKey()),
+                    deserialize(btreeMeta.getLastKey()));
+        }
+    }
+}

Reply via email to