jerry-024 commented on code in PR #7933:
URL: https://github.com/apache/paimon/pull/7933#discussion_r3303450426
##########
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java:
##########
@@ -475,7 +475,7 @@ void testAppendFilterOldFilesBeforeNewFiles() {
GenericIndexTopoBuilder.filterEntriesBefore(
entries,
GenericIndexTopoBuilder.findMinNonIndexableRowId(
Review Comment:
**Compile error**: `findMinNonIndexableRowId` and `filterEntriesBefore` were
moved from `GenericIndexTopoBuilder` to `GlobalIndexBuilderUtils` in commit
0cfc7ef, but this test still references them via
`GenericIndexTopoBuilder.findMinNonIndexableRowId(...)` and
`GenericIndexTopoBuilder.filterEntriesBefore(...)`. This will fail to compile.
Should be:
```java
GlobalIndexBuilderUtils.filterEntriesBefore(
entries,
GlobalIndexBuilderUtils.findMinNonIndexableRowId(
schemaManager, entries, Collections.singletonList("vec")));
```
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java:
##########
@@ -626,16 +648,35 @@ public void processElement(StreamRecord<ShardTask>
element) throws Exception {
}
// Only write rows within this shard's range
if (currentRowId >= task.shardRange.from) {
- Object fieldData =
indexFieldGetter.getFieldOrNull(row);
- if (fieldData == null) {
- LOG.info(
- "Null vector at rowId={}, stopping
shard [{}, {}].",
- currentRowId,
- task.shardRange.from,
- task.shardRange.to);
- break;
+ if (multiColumn) {
+ boolean hasNull = false;
+ for (InternalRow.FieldGetter getter :
indexFieldGetters) {
+ if (getter.getFieldOrNull(row) == null) {
+ hasNull = true;
+ break;
+ }
+ }
+ if (hasNull) {
+ LOG.info(
+ "Null value in indexed columns at
rowId={}, stopping shard [{}, {}].",
+ currentRowId,
+ task.shardRange.from,
+ task.shardRange.to);
+ break;
+ }
+ ((GlobalIndexMultiColumnWriter)
indexWriter).write(row);
+ } else {
+ Object fieldData =
indexFieldGetters[0].getFieldOrNull(row);
+ if (fieldData == null) {
+ LOG.info(
Review Comment:
**Multi-column writer receives extra `_ROW_ID` column**: In the multi-column
path, `row` is passed directly to `GlobalIndexMultiColumnWriter.write(row)`,
but this row comes from `projectedRowType` which is `indexColumns + _ROW_ID`.
The writer's contract (javadoc on `GlobalIndexMultiColumnWriter.write`) says
the row layout should match the fields passed to
`GlobalIndexerFactory.create(List<DataField>, Options)` — which doesn't include
`_ROW_ID`.
The previous ES-specific code handled this with a `ProjectedRow` that
stripped `_ROW_ID` before writing. Consider adding a similar projection here,
or clarifying the writer contract.
##########
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java:
##########
@@ -99,27 +120,38 @@ public CommitMessage build(CloseableIterator<InternalRow>
data) throws IOExcepti
private List<ResultEntry> writePaimonRows(
CloseableIterator<InternalRow> rows, LongCounter rowCounter)
throws IOException {
- GlobalIndexSingletonWriter indexWriter =
- (GlobalIndexSingletonWriter)
- createIndexWriter(table, indexType, indexField,
options);
+ GlobalIndexWriter indexWriter = createIndexWriter(table, indexType,
indexFields, options);
+ boolean multiColumn = indexFields.size() > 1;
try {
- InternalRow.FieldGetter getter =
- InternalRow.createFieldGetter(
- indexField.type(),
readType.getFieldIndex(indexField.name()));
- rows.forEachRemaining(
- row -> {
- Object indexO = getter.getFieldOrNull(row);
- indexWriter.write(indexO);
- rowCounter.add(1);
- });
+ if (multiColumn) {
+ GlobalIndexMultiColumnWriter multiWriter =
+ (GlobalIndexMultiColumnWriter) indexWriter;
+ rows.forEachRemaining(
+ row -> {
Review Comment:
Same `_ROW_ID` issue as the Flink side: `rows` come from a reader using
`readType = indexColumns + _ROW_ID`, but the multi-column writer expects only
index columns. The row passed to `multiWriter.write(row)` includes the extra
`_ROW_ID` field.
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java:
##########
@@ -97,7 +106,7 @@ public String[] call(
BTreeIndexTopoBuilder.buildIndexAndExecute(
procedureContext.getExecutionEnvironment(),
table,
Review Comment:
BTree silently drops extra columns: when a user passes `"col1,col2"` with
index type `btree`, only `indexColumns.get(0)` is used — no error, no warning.
Consider adding a validation:
```java
if ("btree".equalsIgnoreCase(indexType)) {
checkArgument(indexColumns.size() == 1,
"BTree index only supports single column, got: %s", indexColumns);
}
```
##########
paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java:
##########
@@ -78,6 +130,53 @@ public static GlobalIndexWriter createIndexWriter(
return
globalIndexer.createWriter(createGlobalIndexFileReadWrite(table));
}
+ public static GlobalIndexWriter createIndexWriter(
+ FileStoreTable table, String indexType, List<DataField> fields,
Options options)
+ throws IOException {
+ GlobalIndexer globalIndexer = GlobalIndexer.create(indexType, fields,
options);
+ return
globalIndexer.createWriter(createGlobalIndexFileReadWrite(table));
+ }
+
+ /**
+ * Find the minimum firstRowId among files whose schema does not contain
all index columns.
+ * Files at or beyond this rowId cannot be indexed because the column was
added later via ALTER
+ * TABLE.
+ *
+ * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files
contain the columns
+ */
+ public static long findMinNonIndexableRowId(
+ SchemaManager schemaManager, List<ManifestEntry> entries,
List<String> indexColumns) {
+ Map<Long, Boolean> schemaContainsColumns = new HashMap<>();
+ long minRowId = Long.MAX_VALUE;
+ for (ManifestEntry entry : entries) {
+ long sid = entry.file().schemaId();
+ boolean contains =
+ schemaContainsColumns.computeIfAbsent(
+ sid,
+ id ->
schemaManager.schema(id).fieldNames().containsAll(indexColumns));
+ if (!contains && entry.file().firstRowId() != null) {
+ minRowId = Math.min(minRowId,
entry.file().nonNullFirstRowId());
+ }
+ }
+ return minRowId;
+ }
+
Review Comment:
Minor: the old `filterEntriesBefore` in `GenericIndexTopoBuilder` had a
`LOG.info("Filtered {} files ...")` line for observability. This was lost
during extraction since `GlobalIndexBuilderUtils` has no logger. Consider
adding one — this log is useful for debugging index build issues in production.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]