This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 52a6269831 [index] lumina index build support append model (#7631)
52a6269831 is described below

commit 52a626983147ab11a3500bd2566a11dd560a1ec2
Author: jerry <[email protected]>
AuthorDate: Mon Apr 13 13:56:36 2026 +0800

    [index] lumina index build support append model (#7631)
---
 .../flink/globalindex/GenericIndexTopoBuilder.java | 60 +++++++++++++++++++++-
 .../globalindex/GenericIndexTopoBuilderTest.java   | 59 +++++++++++++++++++++
 2 files changed, 118 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
index afa68624ee..e37970723c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java
@@ -38,6 +38,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -64,6 +65,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -219,6 +221,10 @@ public class GenericIndexTopoBuilder {
                 indexColumn,
                 maxIndexedRowId);
 
+        long minNonIndexableRowId =
+                findMinNonIndexableRowId(table.schemaManager(), entries, 
indexColumn);
+        entries = filterEntriesBefore(entries, minNonIndexableRowId);
+
         RowType rowType = table.rowType();
         DataField indexField = rowType.getField(indexColumn);
         // Project indexColumn + _ROW_ID so we can read the actual row ID from 
data
@@ -292,6 +298,49 @@ public class GenericIndexTopoBuilder {
         return true;
     }
 
+    /**
+     * Find the minimum firstRowId among files whose schema does not contain 
the index column. Files
+     * at or beyond this rowId cannot be indexed because the column was added 
later via ALTER TABLE.
+     *
+     * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files 
contain the column
+     */
+    static long findMinNonIndexableRowId(
+            SchemaManager schemaManager, List<ManifestEntry> entries, String 
indexColumn) {
+        Map<Long, Boolean> schemaContainsColumn = new HashMap<>();
+        long minRowId = Long.MAX_VALUE;
+        for (ManifestEntry entry : entries) {
+            long sid = entry.file().schemaId();
+            boolean contains =
+                    schemaContainsColumn.computeIfAbsent(
+                            sid, id -> 
schemaManager.schema(id).fieldNames().contains(indexColumn));
+            if (!contains && entry.file().firstRowId() != null) {
+                minRowId = Math.min(minRowId, 
entry.file().nonNullFirstRowId());
+            }
+        }
+        return minRowId;
+    }
+
+    /** Keep only entries whose firstRowId is strictly less than the given 
boundary. */
+    static List<ManifestEntry> filterEntriesBefore(
+            List<ManifestEntry> entries, long boundaryRowId) {
+        if (boundaryRowId == Long.MAX_VALUE) {
+            return entries;
+        }
+        List<ManifestEntry> result = new ArrayList<>();
+        for (ManifestEntry entry : entries) {
+            if (entry.file().firstRowId() != null
+                    && entry.file().nonNullFirstRowId() < boundaryRowId) {
+                result.add(entry);
+            }
+        }
+        LOG.info(
+                "Filtered {} files at or beyond rowId {}, {} files remain.",
+                entries.size() - result.size(),
+                boundaryRowId,
+                result.size());
+        return result;
+    }
+
     /**
      * Compute shard tasks for a full build (no rows to skip).
      *
@@ -577,7 +626,16 @@ public class GenericIndexTopoBuilder {
                         }
                         // Only write rows within this shard's range
                         if (currentRowId >= task.shardRange.from) {
-                            
indexWriter.write(indexFieldGetter.getFieldOrNull(row));
+                            Object fieldData = 
indexFieldGetter.getFieldOrNull(row);
+                            if (fieldData == null) {
+                                LOG.info(
+                                        "Null vector at rowId={}, stopping 
shard [{}, {}].",
+                                        currentRowId,
+                                        task.shardRange.from,
+                                        task.shardRange.to);
+                                break;
+                            }
+                            indexWriter.write(fieldData);
                             rowsWritten++;
                         }
                     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
index d8ea36e9d5..0de57077b2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.PojoDataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -36,6 +38,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -450,8 +453,64 @@ class GenericIndexTopoBuilderTest {
         assertThat(tasks.get(1).shardRange).isEqualTo(new Range(400, 499));
     }
 
+    @Test
+    void testAppendFilterOldFilesBeforeNewFiles() {
+        // Typical append: write file0[0,99](schema1), file1[100,199](schema1),
+        // then file2[200,299](schema0) arrives (old schema).
+        // Boundary = 200, keep files with firstRowId < 200.
+        SchemaManager schemaManager = mock(SchemaManager.class);
+        TableSchema oldSchema = mock(TableSchema.class);
+        TableSchema newSchema = mock(TableSchema.class);
+        when(schemaManager.schema(0L)).thenReturn(oldSchema);
+        when(schemaManager.schema(1L)).thenReturn(newSchema);
+        when(oldSchema.fieldNames()).thenReturn(Arrays.asList("id", "name"));
+        when(newSchema.fieldNames()).thenReturn(Arrays.asList("id", "name", 
"vec"));
+
+        List<ManifestEntry> entries = new ArrayList<>();
+        entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 0L, 100, 1L));
+        entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 100L, 100, 
1L));
+        entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 200L, 100, 
0L));
+
+        List<ManifestEntry> result =
+                GenericIndexTopoBuilder.filterEntriesBefore(
+                        entries,
+                        GenericIndexTopoBuilder.findMinNonIndexableRowId(
+                                schemaManager, entries, "vec"));
+
+        assertThat(result).hasSize(2);
+        assertThat(result.get(0).file().nonNullFirstRowId()).isEqualTo(0L);
+        assertThat(result.get(1).file().nonNullFirstRowId()).isEqualTo(100L);
+    }
+
     // -- Helpers --
 
+    private static ManifestEntry createEntryWithSchemaId(
+            BinaryRow partition, Long firstRowId, long rowCount, long 
schemaId) {
+        PojoDataFileMeta file =
+                new PojoDataFileMeta(
+                        "test-file-" + UUID.randomUUID(),
+                        1024L,
+                        rowCount,
+                        BinaryRow.EMPTY_ROW,
+                        BinaryRow.EMPTY_ROW,
+                        SimpleStats.EMPTY_STATS,
+                        SimpleStats.EMPTY_STATS,
+                        0L,
+                        0L,
+                        schemaId,
+                        0,
+                        Collections.emptyList(),
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        firstRowId,
+                        null);
+        return ManifestEntry.create(FileKind.ADD, partition, 0, 1, file);
+    }
+
     private static ManifestEntry createEntry(BinaryRow partition, Long 
firstRowId, long rowCount) {
         PojoDataFileMeta file =
                 new PojoDataFileMeta(

Reply via email to