CrownChu commented on PR #7933:
URL: https://github.com/apache/paimon/pull/7933#issuecomment-4536226538
### Fixes and Additions in This Round
#### 1. [Bug] Flink BuildIndexOperator: Multi-column writer receives row
with extra _ROW_ID field
**Fix:** The read projection includes `indexColumns + _ROW_ID` (_ROW_ID is
needed for shard boundary positioning). Before passing to the writer, a
secondary projection via
`ProjectedRow.from(indexOnlyMapping)` strips the trailing `_ROW_ID`,
ensuring `GlobalIndexMultiColumnWriter.write()` receives a row containing only
the index fields.
#### 2. [Correctness] No null-field handling in multi-column mode (Flink)
**Fix:** In multi-column mode, each index field is checked individually.
If any field is null, the current shard stops writing immediately (break). Only
rows where all indexed columns are non-null are
written to the index.
#### 3. [Robustness] Unsafe cast without instanceof check
**Fix:** Added `instanceof GlobalIndexMultiColumnWriter` check before
casting. If the check fails, an `UnsupportedOperationException` is thrown with
a clear message indicating the factory must override
`create(List<DataField>, Options)` and return a
`GlobalIndexMultiColumnWriter`.
#### 4. [Design] Interface default method creates silent data-loss path
**Fix:** Runtime instanceof check + exception ensures that when a factory
does not properly implement multi-column support, it fails fast rather than
silently falling through to SingletonWriter and losing
data.
#### 5. [Minor] resolveFields assumes metadata consistency across range
groups
**Fix:** For multi-column indexes (`indexFieldId == -1`), added validation
that iterates all entries across range groups and verifies `indexFieldId` and
`extraFieldIds` are consistent. Throws an exception on
mismatch. Single-column case retains the original logic unchanged.
#### 6. Multi-column minimum rowId for schema evolution
`findMinNonIndexableRowId` accepts `List<String> indexColumns` and uses
`containsAll(indexColumns)` to check. If any index column is missing from a
file's schema, that file is considered non-indexable and
its `firstRowId` is used as the filter boundary.
#### 7. Multi-column GlobalIndexMeta storage convention
In multi-column mode, `indexFieldId = -1` (`MULTI_COLUMN_INDEX_FIELD_ID`
constant) and all actual field IDs are stored in `extraFieldIds`. Single-column
remains unchanged (`indexFieldId` = actual field ID,
`extraFieldIds = null`). Scanner skips registering under -1 and routes
predicates to the correct reader solely through `extraFieldIds`.
#### 8. New ES Topology Builder (es-multi-index)
- Added `ESIndexTopoBuilder` (Flink) — standalone topology with
shard-level parallelism, schema evolution check, and transparent indexType
pass-through
- Added `ESGlobalIndexTopoBuilder` (Spark) — registered via SPI,
`identifier()` returns `"es-multi-index"`
- Flink `CreateGlobalIndexProcedure` routing:
`startsWith("es-multi-index")` → `ESIndexTopoBuilder`
- Spark `GlobalIndexTopologyBuilderUtils`: exact match first, then prefix
match
#### 9. Multi-condition reader predicate push-down
- `GlobalIndexEvaluator` refactored to async parallel execution
(`CompletableFuture`), no longer implements `PredicateVisitor`
- Added `flattenChildren()` to flatten nested CompoundPredicates of the
same type
- `UnionGlobalIndexReader` removed internal `ExecutorService`; parallelism
managed by the upper-level evaluator
- `GlobalIndexScanner` registers `extraFieldIds` into the index map,
ensuring multi-column predicates hit the correct reader
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]