This is an automated email from the ASF dual-hosted git repository.

jerryjing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e1c9f4ed8 [index] lumina index build in flink support increment 
(#7611)
1e1c9f4ed8 is described below

commit 1e1c9f4ed8c28420e8248862b15b03fce9b7edca
Author: jerry <[email protected]>
AuthorDate: Fri Apr 10 09:32:14 2026 +0800

    [index] lumina index build in flink support increment (#7611)
---
 .../flink/globalindex/GenericIndexTopoBuilder.java | 131 +++++++++++-
 .../globalindex/GenericIndexTopoBuilderTest.java   | 232 +++++++++++++++++++++
 2 files changed, 352 insertions(+), 11 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index bcbabac793..afa68624ee 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -88,6 +88,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 public class GenericIndexTopoBuilder {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(GenericIndexTopoBuilder.class);
+    public static final long NO_MAX_INDEXED_ROW_ID = -1L;
 
     public static void buildIndexAndExecute(
             StreamExecutionEnvironment env,
@@ -97,6 +98,25 @@ public class GenericIndexTopoBuilder {
             PartitionPredicate partitionPredicate,
             Options userOptions)
             throws Exception {
+        buildIndexAndExecute(
+                env,
+                table,
+                indexColumn,
+                indexType,
+                partitionPredicate,
+                userOptions,
+                NO_MAX_INDEXED_ROW_ID);
+    }
+
+    public static void buildIndexAndExecute(
+            StreamExecutionEnvironment env,
+            FileStoreTable table,
+            String indexColumn,
+            String indexType,
+            PartitionPredicate partitionPredicate,
+            Options userOptions,
+            long maxIndexedRowId)
+            throws Exception {
         boolean hasIndexToBuild =
                 buildIndex(
                         env,
@@ -105,7 +125,8 @@ public class GenericIndexTopoBuilder {
                         indexColumn,
                         indexType,
                         partitionPredicate,
-                        userOptions);
+                        userOptions,
+                        maxIndexedRowId);
         if (hasIndexToBuild) {
             env.execute("Create " + indexType + " global index for table: " + 
table.name());
         } else {
@@ -113,6 +134,26 @@ public class GenericIndexTopoBuilder {
         }
     }
 
+    public static boolean buildIndex(
+            StreamExecutionEnvironment env,
+            Supplier<GenericGlobalIndexBuilder> indexBuilderSupplier,
+            FileStoreTable table,
+            String indexColumn,
+            String indexType,
+            PartitionPredicate partitionPredicate,
+            Options userOptions)
+            throws Exception {
+        return buildIndex(
+                env,
+                indexBuilderSupplier,
+                table,
+                indexColumn,
+                indexType,
+                partitionPredicate,
+                userOptions,
+                NO_MAX_INDEXED_ROW_ID);
+    }
+
     /**
      * Builds a generic global index topology using a {@link 
GenericGlobalIndexBuilder} supplier.
      *
@@ -126,7 +167,8 @@ public class GenericIndexTopoBuilder {
             String indexColumn,
             String indexType,
             PartitionPredicate partitionPredicate,
-            Options userOptions)
+            Options userOptions,
+            long maxIndexedRowId)
             throws Exception {
         GenericGlobalIndexBuilder indexBuilder = indexBuilderSupplier.get();
         if (partitionPredicate != null) {
@@ -135,14 +177,47 @@ public class GenericIndexTopoBuilder {
 
         List<ManifestEntry> entries = indexBuilder.scan();
         List<IndexManifestEntry> deletedIndexEntries = 
indexBuilder.deletedIndexEntries();
+
+        return buildTopology(
+                env,
+                table,
+                indexColumn,
+                indexType,
+                userOptions,
+                entries,
+                deletedIndexEntries,
+                maxIndexedRowId);
+    }
+
+    /**
+     * Builds the Flink topology for global index creation from pre-scanned 
entries. Supports both
+     * full builds ({@code maxIndexedRowId = NO_MAX_INDEXED_ROW_ID}) and 
incremental builds where
+     * rows up to {@code maxIndexedRowId} are skipped.
+     *
+     * @param maxIndexedRowId the maximum row ID already indexed; use {@link 
#NO_MAX_INDEXED_ROW_ID}
+     *     for a full build
+     * @return {@code true} if a Flink topology was built, {@code false} if 
nothing to index
+     */
+    private static boolean buildTopology(
+            StreamExecutionEnvironment env,
+            FileStoreTable table,
+            String indexColumn,
+            String indexType,
+            Options userOptions,
+            List<ManifestEntry> entries,
+            List<IndexManifestEntry> deletedIndexEntries,
+            long maxIndexedRowId)
+            throws Exception {
         long totalRowCount = entries.stream().mapToLong(e -> 
e.file().rowCount()).sum();
         LOG.info(
-                "Scanned {} files ({} rows) across {} partitions for {} index 
on column '{}'.",
+                "Scanned {} files ({} rows) across {} partitions for {} index 
on column '{}'"
+                        + (maxIndexedRowId >= 0 ? ", maxIndexedRowId={}." : 
"."),
                 entries.size(),
                 totalRowCount,
                 
entries.stream().map(ManifestEntry::partition).distinct().count(),
                 indexType,
-                indexColumn);
+                indexColumn,
+                maxIndexedRowId);
 
         RowType rowType = table.rowType();
         DataField indexField = rowType.getField(indexColumn);
@@ -160,7 +235,8 @@ public class GenericIndexTopoBuilder {
                 "Option 'global-index.row-count-per-shard' must be greater 
than 0.");
 
         // Compute shard tasks at file level from the provided entries
-        List<ShardTask> shardTasks = computeShardTasks(table, entries, 
rowsPerShard);
+        List<ShardTask> shardTasks =
+                computeShardTasks(table, entries, rowsPerShard, 
maxIndexedRowId);
         if (shardTasks.isEmpty()) {
             LOG.info("No shard tasks generated, nothing to index.");
             return false;
@@ -216,13 +292,33 @@ public class GenericIndexTopoBuilder {
         return true;
     }
 
+    /**
+     * Compute shard tasks for a full build (no rows to skip).
+     *
+     * @see #computeShardTasks(FileStoreTable, List, long, long)
+     */
+    static List<ShardTask> computeShardTasks(
+            FileStoreTable table, List<ManifestEntry> entries, long 
rowsPerShard) {
+        return computeShardTasks(table, entries, rowsPerShard, 
NO_MAX_INDEXED_ROW_ID);
+    }
+
     /**
      * Compute shard tasks at file level from the given manifest entries. Each 
shard only contains
      * the files whose row ID ranges overlap with its shard range. A file 
spanning multiple shard
      * boundaries is included in each overlapping shard.
+     *
+     * <p>When {@code maxIndexedRowId >= 0}, each shard's effective start is 
advanced past {@code
+     * maxIndexedRowId}, skipping fully-indexed shards entirely. This enables 
incremental index
+     * building where only new (un-indexed) rows are processed.
+     *
+     * @param maxIndexedRowId the maximum row ID already indexed; use {@link 
#NO_MAX_INDEXED_ROW_ID}
+     *     for a full build
      */
     static List<ShardTask> computeShardTasks(
-            FileStoreTable table, List<ManifestEntry> entries, long 
rowsPerShard) {
+            FileStoreTable table,
+            List<ManifestEntry> entries,
+            long rowsPerShard,
+            long maxIndexedRowId) {
         // Group by partition (bucket is always 0 for unaware-bucket tables)
         Map<BinaryRow, List<ManifestEntry>> entriesByPartition =
                 
entries.stream().collect(Collectors.groupingBy(ManifestEntry::partition));
@@ -266,6 +362,15 @@ public class GenericIndexTopoBuilder {
                     continue;
                 }
 
+                // For incremental builds, advance past already-indexed rows
+                long effectiveStart =
+                        maxIndexedRowId >= 0
+                                ? Math.max(shardStart, maxIndexedRowId + 1)
+                                : shardStart;
+                if (effectiveStart > shardEnd) {
+                    continue; // entire shard already indexed
+                }
+
                 
shardFiles.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
 
                 // Group contiguous files; gaps produce separate tasks
@@ -286,7 +391,7 @@ public class GenericIndexTopoBuilder {
                         tasks.add(
                                 createShardTask(
                                         currentGroup,
-                                        shardStart,
+                                        effectiveStart,
                                         shardEnd,
                                         partition,
                                         partBucketPath));
@@ -298,7 +403,11 @@ public class GenericIndexTopoBuilder {
                 if (!currentGroup.isEmpty()) {
                     tasks.add(
                             createShardTask(
-                                    currentGroup, shardStart, shardEnd, 
partition, partBucketPath));
+                                    currentGroup,
+                                    effectiveStart,
+                                    shardEnd,
+                                    partition,
+                                    partBucketPath));
                 }
             }
         }
@@ -307,7 +416,7 @@ public class GenericIndexTopoBuilder {
 
     private static ShardTask createShardTask(
             List<DataFileMeta> files,
-            long shardStart,
+            long effectiveStart,
             long shardEnd,
             BinaryRow partition,
             String bucketPath) {
@@ -315,8 +424,8 @@ public class GenericIndexTopoBuilder {
         long groupMaxRowId =
                 files.stream().mapToLong(f -> 
f.nonNullRowIdRange().to).max().getAsLong();
 
-        // Clamp to shard boundaries
-        long rangeFrom = Math.max(groupMinRowId, shardStart);
+        // Clamp to effective boundaries
+        long rangeFrom = Math.max(groupMinRowId, effectiveStart);
         long rangeTo = Math.min(groupMaxRowId, shardEnd);
 
         DataSplit dataSplit =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
index 8a01a8f7e5..d8ea36e9d5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
@@ -218,6 +218,238 @@ class GenericIndexTopoBuilderTest {
         assertThat(tasks.get(1).shardRange).isEqualTo(new Range(100, 149));
     }
 
+    // ========== Incremental build scenarios (maxIndexedRowId) ==========
+
+    @Test
+    void testIncrementalFirstBuildNoIndex() {
+        // First build: no existing index, two files.
+        // maxIndexedRowId=-1 → all shards created normally.
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 100));
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 100L, 100));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
-1);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(0, 199));
+        assertThat(tasks.get(0).split.dataFiles()).hasSize(2);
+    }
+
+    @Test
+    void testIncrementalNormalNoCompaction() {
+        // Indexed [0,199], new file [200,399]. No compaction.
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 200));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
199);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+    }
+
+    @Test
+    void testIncrementalNoNewDataAllIndexed() {
+        // All data [0,399] already indexed. All shards should be skipped.
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 400));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
399);
+
+        assertThat(tasks).isEmpty();
+    }
+
+    @Test
+    void testIncrementalCompactMergesIndexedAndUnindexed() {
+        // Files A[0,99], B[100,199] indexed, new C[200,299], compact B+C → 
D[100,299]
+        // Shard 0 [0,199]: effectiveStart=200 > 199 → skip
+        // Shard 1 [200,299]: effectiveStart=200 → [200,299]
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 100L, 200)); // D[100,299]
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
199);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 299));
+    }
+
+    @Test
+    void testIncrementalCompactOnlyIndexedFiles() {
+        // Compact two indexed files → empty entries → no tasks.
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, 
Collections.emptyList(), 200, 199);
+
+        assertThat(tasks).isEmpty();
+    }
+
+    @Test
+    void testIncrementalCompactPartialWithUntouchedFiles() {
+        // Indexed [0,399]. Compact [200,399]+[400,599] → D[200,599].
+        // Shard 1 [200,399]: effectiveStart=400 > 399 → skip
+        // Shard 2 [400,599]: effectiveStart=400 → [400,599]
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 400)); // D[200,599]
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
399);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(400, 599));
+    }
+
+    @Test
+    void testIncrementalMultipleWritesThenCompact() {
+        // Write 200 (indexed), write 200 more, compact → big file [0,399].
+        // Shard 0 [0,199]: effectiveStart=200 > 199 → skip
+        // Shard 1 [200,399]: effectiveStart=200 → [200,399]
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 400));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
199);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+    }
+
+    @Test
+    void testIncrementalMergeAllSmallShards() {
+        // All entries small and deleted by merge. maxIndexedRowId=-1 → full 
rebuild.
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 250));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
-1);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(0, 199));
+        assertThat(tasks.get(1).shardRange).isEqualTo(new Range(200, 249));
+    }
+
+    @Test
+    void testIncrementalMergePartialKeepLargeShard() {
+        // Entry [0,199] kept. Small shards [200-399] merged. 
maxIndexedRowId=199.
+        // Shard 0 [0,199]: effectiveStart=200 > 199 → skip
+        // Shard 1 [200,399]: effectiveStart=200 → [200,399]
+        // Shard 2 [400,599]: effectiveStart=200 → [400,599]
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 600));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
199);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+        assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 599));
+    }
+
+    @Test
+    void testIncrementalShardBoundaryExactAlign() {
+        // maxIndexedRowId=199, new file starts exactly at shard boundary.
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 200));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
199);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+    }
+
+    @Test
+    void testIncrementalShardBoundaryNotAligned() {
+        // maxIndexedRowId=149. Compacted file [0,349].
+        // Shard 0 [0,199]: effectiveStart=150 → [150,199]
+        // Shard 1 [200,349]: → [200,349]
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 350));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
149);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(150, 199));
+        assertThat(tasks.get(1).shardRange).isEqualTo(new Range(200, 349));
+    }
+
+    @Test
+    void testIncrementalFileSpansMultipleShards() {
+        // One large file [0,599] spanning 3 shards, indexed [0,199].
+        // Shard 0: skip. Shard 1: [200,399]. Shard 2: [400,599].
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 600));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
199);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+        assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 599));
+    }
+
+    @Test
+    void testIncrementalNullFirstRowIdFileSkipped() {
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, null, 100));
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 100));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
199);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 299));
+        assertThat(tasks.get(0).split.dataFiles()).hasSize(1);
+    }
+
+    @Test
+    void testIncrementalMultipleFilesInOneShard() {
+        // Two contiguous new files in same shard.
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 200L, 100));
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 300L, 100));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 400, 
199);
+
+        assertThat(tasks).hasSize(1);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(200, 399));
+        assertThat(tasks.get(0).split.dataFiles()).hasSize(2);
+    }
+
+    @Test
+    void testIncrementalGapBetweenFilesProducesSeparateTasks() {
+        // Two files with a gap, same shard. maxIndexedRowId=-1.
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 0L, 50));
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 150L, 50));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
-1);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(0, 49));
+        assertThat(tasks.get(1).shardRange).isEqualTo(new Range(150, 199));
+    }
+
+    @Test
+    void testIncrementalFileStartsAfterEffectiveStart() {
+        // maxIndexedRowId=250. New file [300,499].
+        // Shard 1 [200,399]: effectiveStart=251, file starts at 300 → 
[300,399]
+        // Shard 2 [400,499]: effectiveStart=251 → [400,499]
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntry(BinaryRow.EMPTY_ROW, 300L, 200));
+
+        List<GenericIndexTopoBuilder.ShardTask> tasks =
+                GenericIndexTopoBuilder.computeShardTasks(table, entries, 200, 
250);
+
+        assertThat(tasks).hasSize(2);
+        assertThat(tasks.get(0).shardRange).isEqualTo(new Range(300, 399));
+        assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 499));
+    }
+
     // -- Helpers --
 
     private static ManifestEntry createEntry(BinaryRow partition, Long 
firstRowId, long rowCount) {

Reply via email to