wombatu-kun opened a new pull request, #18403:
URL: https://github.com/apache/hudi/pull/18403

   ### Describe the issue this Pull Request addresses
   
   Closes https://github.com/apache/hudi/issues/17736  
   
   ### Summary and Changelog
   
   Spark queries on COPY_ON_WRITE Lance tables now use columnar batch 
(vectorized) reading. Instead of decomposing Arrow batches into individual 
rows, Spark receives entire ColumnarBatch objects backed by zero-copy 
LanceArrowColumnVector wrappers. This eliminates per-row materialization 
overhead on the read path for COW base-file-only scans.  
                                                                                
                                                                                
                                                                                
                           
   MOR tables, incremental queries are unaffected — they continue to use the 
row-based path, which is an existing Hudi constraint for all file formats 
(Parquet, ORC, Lance).
   
   Limitations  
     1. COW only. Vectorized batch reading is enabled only for COPY_ON_WRITE 
tables in the base-file-only read path. MOR tables always use the row-based 
path — this is an existing Hudi-wide constraint (supportReturningBatch = 
!isMOR), not Lance-specific. Even MOR file groups with no log files go through 
HoodieFileGroupReader, which merges at the row level.  
     2. Type-change fallback. When the file schema and the query schema have 
implicit type differences (e.g., INT→LONG, FLOAT→DOUBLE), the reader falls back 
to the row-based path for that file. Batch-level type casting is deferred to a 
follow-up.  
     3. No filter pushdown in batch mode. Lance filter pushdown is not yet 
implemented; the filters parameter is passed as null to lanceReader.readAll(). 
Spark applies filters on top of the returned batches. This is unchanged from 
the row-based path.  
     4. Multi-format tables. When isMultipleBaseFileFormatsEnabled is true 
(table has mixed Parquet/ORC/Lance base files), Lance batch reading is disabled 
to avoid vector type conflicts between formats. The vectorTypes() method 
returns the Parquet/ORC vector types in that case. 
     5. Incremental and bootstrap queries. These disable vectorized reading for 
all formats (`supportVectorizedRead = !isIncremental && !isBootstrap && 
supportBatch`). 
     6. Fixed batch size. The Lance batch size is hardcoded at 512 rows 
(DEFAULT_BATCH_SIZE). It is not configurable via Spark session settings.
   
   Detailed changelog
   `LanceBatchIterator` — NEW (hudi-client/hudi-spark-client)  
   - `Iterator<ColumnarBatch> + Closeable` that reads Arrow batches from a 
Lance file and wraps each batch's field vectors in `LanceArrowColumnVector[]` 
to produce a `ColumnarBatch`.
   - Column vector wrappers are created once and reused across batches (Arrow's 
ArrowReader reuses the same `VectorSchemaRoot`).
   - Owns and manages the lifecycle of `BufferAllocator, LanceFileReader, and 
ArrowReader`; closes them in order on close().
   - Follows the same lifecycle pattern as the existing `LanceRecordIterator`.
   
   `SparkLanceReaderBase` — MODIFIED (hudi-spark-datasource/hudi-spark-common)  
   - The single `read()` method now branches into `readBatch()` (columnar) and 
`readRows()` (row-based) based on `enableVectorizedReader` and whether implicit 
type changes exist.
   - `readBatch()`:
     - Creates a `LanceBatchIterator` for zero-copy batch iteration.
     - Computes a column mapping (requiredSchema → requestSchema) to reorder 
columns and identify columns missing from the file.
     - Schema evolution (column addition): missing columns are backed by 
all-null Arrow FieldVector instances (allocated via 
LanceArrowUtils.toArrowField → Field.createVector), wrapped in 
`LanceArrowColumnVector`. This satisfies Spark's vectorTypes() contract which
   expects all data columns to be `LanceArrowColumnVector`. A dedicated child 
allocator (nullAllocator) manages these vectors and is closed before the data 
allocator.
     - Partition columns: pre-created `OnHeapColumnVector` arrays are filled 
with constant partition values via `populatePartitionVectors()`, which handles 
all Spark primitive types, strings, decimals, and binary. Vectors are reused 
across batches; re-populated only
   when batch size changes.
     - A mappedIterator (implementing Iterator[ColumnarBatch] with Closeable) 
assembles the final batch per iteration and is registered with TaskContext for 
cleanup.
     - Type-change fallback: when `implicitTypeChangeInfo` is non-empty (e.g., 
file has FLOAT, query requires DOUBLE), falls back to `readRows()` which 
applies cast projections at the row level. Batch-level type casting is deferred 
to a follow-up.
   - `readRows()`: the original row-based logic, extracted into its own method. 
Behavior unchanged.
   - `populatePartitionVectors()`: new private helper, supports Boolean, Byte, 
Short, Int/Date, Long/Timestamp, Float, Double, String, Decimal (int/long/big), 
Binary. Unsupported types fall back to nulls.
   
   `HoodieFileGroupReaderBasedFileFormat` — MODIFIED 
(hudi-spark-datasource/hudi-spark-common) 
   - `supportBatch()`: changed `val lanceBatchSupported = false` to `val 
lanceBatchSupported = true`. The existing guards `supportVectorizedRead = 
!isIncremental && !isBootstrap && supportBatch and supportReturningBatch = 
!isMOR && supportVectorizedRead` remain unchanged
   and apply to all formats.
   - `vectorTypes()`: added a branch for LANCE format (when not in multi-format 
mode) returning `LanceArrowColumnVector` class name for all data columns and 
`OnHeapColumnVector` for partition columns. The existing Parquet/ORC logic is 
wrapped in the else branch,
   unchanged.
   - Two fallback `readBaseFile` call sites: changed `baseFileReader.value` to 
`fileGroupBaseFileReader.value`. This fixes a `ClassCastException 
(ColumnarBatch cannot be cast to InternalRow)` that occurred on MOR tables and 
other paths that go through
   `HoodieFileGroupReader`, which expects row-based input. `baseFileReader` is 
now the vectorized reader (returns ColumnarBatch for COW), while 
`fileGroupBaseFileReader` is always non-vectorized (returns InternalRow).
   
   `TestLanceColumnarBatch` — NEW (hudi-spark-datasource/hudi-spark/src/test)  
   - 5 unit tests invoking `SparkLanceReaderBase` directly:
     - `testRowPathReturnsInternalRows` — verifies enableVectorizedReader=false 
returns InternalRow, never ColumnarBatch
     - `testColumnarPathReturnsBatches` — verifies enableVectorizedReader=true 
returns ColumnarBatch with correct data
     - `testColumnarPathNullPadsAbsentColumns` — schema evolution: missing 
column is null-padded in batch mode
     - `testColumnarPathAppendsPartitionVectors` — partition values appended as 
constant columns to each batch
     - `testTypeChangeFallsBackToRowPath` — implicit type change (FLOAT→DOUBLE) 
forces row path with correct cast values
   - 3 integration tests via `Spark DataFrame/SQL API`:
     - `testCOWTableDataFrameRead` — COW round-trip with vectorized reads active
     - `testCOWTableSchemaEvolutionNullPadding` — two bulk_inserts with schema 
widening; old files null-padded
     - `testCOWTableSparkSqlQuery` — SELECT ... WHERE predicate evaluation on 
columnar batches
   
   ### Impact
   
   Public API / user-facing changes  
                                                                                
                                                                                
                                               
   - No new configuration options. Vectorized reading activates automatically 
for Lance COW tables. There is no feature flag to toggle it.                    
                                                                                
                           
   - No API changes. All changes are internal to the Spark datasource read 
path. The SparkLanceReaderBase constructor already accepted 
enableVectorizedReader: Boolean; the new batch path is selected when this is 
true.
   - Behavioral change: queries on Lance COW tables that previously returned 
InternalRow one-by-one now return ColumnarBatch. From the user's perspective, 
query results are identical; only the internal execution changes.  
   
   Performance impact  
   
   - Read throughput improvement for COW Lance tables. Eliminates per-row 
UnsafeProjection + .copy() overhead. Arrow vectors are wrapped zero-copy in 
LanceArrowColumnVector and passed directly to Spark's columnar execution 
engine.                                   
   - No regression for MOR tables. MOR continues to use the row-based path 
through fileGroupBaseFileReader, which is explicitly non-vectorized.
   - Minimal memory overhead for schema evolution. Null-padding columns 
allocate a lightweight Arrow FieldVector (validity buffer only, no data buffer) 
via a dedicated child allocator. The allocator is released when the iterator is 
closed. 
   
   ### Risk Level
   
   low
   
   ### Documentation Update
   
   none
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's 
guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Enough context is provided in the sections above
   - [ ] Adequate tests were added if applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to