cshuo opened a new issue, #19053:
URL: https://github.com/apache/hudi/issues/19053

   ---
   
   ## Feature Description
   
   **What the feature achieves:**
   
   Add VECTOR column read/write support to the Flink **Lance** base-file path.
   
   Hudi already supports VECTOR columns end-to-end on **Spark + Lance** and on 
**Flink + Parquet**. However, **Flink + Lance** currently supports neither 
writing nor reading VECTOR columns (in fact, not even plain `ARRAY` columns). 
The gap is entirely inside `HoodieFlinkLanceArrowUtils`
   
(`hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkLanceArrowUtils.java`):
   
   - On write, `toArrowType` / `writeValue` only handle scalar types and throw 
`HoodieNotSupportedException` for `ARRAY`.
   - On read, `toLogicalType` / `readValue` likewise throw for non-scalar Arrow 
types (e.g. `FixedSizeList`), so the reader fails while converting the Arrow 
schema.
   
   This feature implements the missing encode/decode so that a VECTOR column 
can be written to and read from a Flink Lance base file, and—critically—keeps 
the on-disk encoding **strictly aligned with the Spark Lance path** so files 
are cross-engine readable.
   
   **Why this feature is needed:**
   
   - Vector-search / embedding workloads increasingly land through Flink 
streaming pipelines; Lance is the columnar format optimized for vector data, 
but Flink users currently cannot use VECTOR columns with Lance.
   - It closes a parity gap: the same table written by Spark+Lance (or 
Flink+Parquet) cannot be written/read by Flink+Lance today.
   - Cross-engine interoperability: a Lance file with a VECTOR column written 
by Flink should be readable by Spark and vice-versa.
   
   ## User Experience
   
   **How users will use this feature:**
   
   - **DDL / config:** declare the vector column as `ARRAY<FLOAT>` or 
`ARRAY<DOUBLE>`, mark it via the existing vector-columns option 
(`FlinkOptions.VECTOR_COLUMNS`, e.g. `embedding:128,features:64`), and select 
the Lance base file format:
     ```sql
     CREATE TABLE t (
       id STRING,
       embedding ARRAY<FLOAT>,   -- VECTOR(128)
       ts BIGINT,
       PRIMARY KEY (id) NOT ENFORCED
     ) WITH (
       'connector' = 'hudi',
       'path' = '...',
       'vector.columns' = 'embedding:128',
       'hoodie.table.base.file.format' = 'LANCE'
     );
     ```
   - **Behavior:** insert / read / column projection / MOR upsert behave the 
same as on the Parquet base file format (matching the existing 
`ITTestVectorDataSource` coverage).
   - **Element types:** `FLOAT` and `DOUBLE` are supported. `INT8` 
(`ARRAY<TINYINT>` vector) is **rejected with a clear error**, matching the 
current Spark Lance writer limitation (the lance-spark connector only treats 
FLOAT/DOUBLE as fixed-size-list, and the Spark reader only restores 
FLOAT/DOUBLE). INT8 over Lance is a coordinated cross-engine follow-up.
   - **API changes:** none. Existing options are reused.
   
   **On-disk encoding (aligned with Spark Lance):**
   
   - A `VECTOR(dim, FLOAT|DOUBLE)` column is stored as an Arrow 
`FixedSizeList<Float32|Float64, dim>` with a non-nullable child.
   - The Lance file footer carries `hoodie.vector.columns` 
(`HoodieSchema.VECTOR_COLUMNS_METADATA_KEY`), identical to the Spark writer, so 
any engine can identify and reconstruct VECTOR columns.
   
   ## Hudi RFC Requirements
   
   **RFC PR link:** N/A
   
   **Why RFC isn't needed:**
   - Does this change public interfaces/APIs? **No** — reuses existing 
`vector.columns` option and the existing VECTOR data model.
   - Does this change storage format? **No new format** — it reuses the exact 
Lance `FixedSizeList` encoding and footer metadata already produced by the 
Spark Lance writer; the change only adds the Flink-side encode/decode to match.
   - Justification: scoped, additive engine parity with an already-shipped 
on-disk contract; no new public surface or format.
   
   ## Implementation notes (for reference)
   
   - Write: thread the vector-column map 
(`HoodieVectorUtils.detectVectorColumns`) from 
`HoodieRowDataFileWriterFactory.newLanceFileWriter` into 
`HoodieRowDataLanceWriter`; emit `FixedSizeList` fields in a vector-aware 
`toArrowSchema`; write the footer via the existing 
`HoodieBaseLanceWriter.additionalSchemaMetadata()` hook.
   - Read: use the already-passed `requestedSchema` in 
`HoodieRowDataLanceReader.getRowDataIterator` to detect vectors; decode 
`FixedSizeListVector` → `ArrayData`; map `FixedSizeList` → `ARRAY` in 
`toLogicalType`; restore VECTOR metadata from the footer in `getSchema()`.
   - Reuse: `HoodieSchema.Vector` model + footer helpers, 
`HoodieVectorUtils.detectVectorColumns`, Arrow `FixedSizeListVector` / 
`Float4Vector` / `Float8Vector`.
   
   ## Out of scope
   - INT8 vectors on Lance (blocked by lance-spark; revisit cross-engine).
   - Schema evolution on Flink Lance (already explicitly unsupported).
   - Nested VECTOR (inside struct/array/map) — stays rejected, consistent with 
Spark.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to