wombatu-kun opened a new pull request, #18403: URL: https://github.com/apache/hudi/pull/18403
### Describe the issue this Pull Request addresses Closes https://github.com/apache/hudi/issues/17736 ### Summary and Changelog Spark queries on COPY_ON_WRITE Lance tables now use columnar batch (vectorized) reading. Instead of decomposing Arrow batches into individual rows, Spark receives entire ColumnarBatch objects backed by zero-copy LanceArrowColumnVector wrappers. This eliminates per-row materialization overhead on the read path for COW base-file-only scans. MOR tables, incremental queries are unaffected — they continue to use the row-based path, which is an existing Hudi constraint for all file formats (Parquet, ORC, Lance). Limitations 1. COW only. Vectorized batch reading is enabled only for COPY_ON_WRITE tables in the base-file-only read path. MOR tables always use the row-based path — this is an existing Hudi-wide constraint (supportReturningBatch = !isMOR), not Lance-specific. Even MOR file groups with no log files go through HoodieFileGroupReader, which merges at the row level. 2. Type-change fallback. When the file schema and the query schema have implicit type differences (e.g., INT→LONG, FLOAT→DOUBLE), the reader falls back to the row-based path for that file. Batch-level type casting is deferred to a follow-up. 3. No filter pushdown in batch mode. Lance filter pushdown is not yet implemented; the filters parameter is passed as null to lanceReader.readAll(). Spark applies filters on top of the returned batches. This is unchanged from the row-based path. 4. Multi-format tables. When isMultipleBaseFileFormatsEnabled is true (table has mixed Parquet/ORC/Lance base files), Lance batch reading is disabled to avoid vector type conflicts between formats. The vectorTypes() method returns the Parquet/ORC vector types in that case. 5. Incremental and bootstrap queries. These disable vectorized reading for all formats (`supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch`). 6. Fixed batch size. The Lance batch size is hardcoded at 512 rows (DEFAULT_BATCH_SIZE). It is not configurable via Spark session settings. Detailed changelog `LanceBatchIterator` — NEW (hudi-client/hudi-spark-client) - `Iterator<ColumnarBatch> + Closeable` that reads Arrow batches from a Lance file and wraps each batch's field vectors in `LanceArrowColumnVector[]` to produce a `ColumnarBatch`. - Column vector wrappers are created once and reused across batches (Arrow's ArrowReader reuses the same `VectorSchemaRoot`). - Owns and manages the lifecycle of `BufferAllocator, LanceFileReader, and ArrowReader`; closes them in order on close(). - Follows the same lifecycle pattern as the existing `LanceRecordIterator`. `SparkLanceReaderBase` — MODIFIED (hudi-spark-datasource/hudi-spark-common) - The single `read()` method now branches into `readBatch()` (columnar) and `readRows()` (row-based) based on `enableVectorizedReader` and whether implicit type changes exist. - `readBatch()`: - Creates a `LanceBatchIterator` for zero-copy batch iteration. - Computes a column mapping (requiredSchema → requestSchema) to reorder columns and identify columns missing from the file. - Schema evolution (column addition): missing columns are backed by all-null Arrow FieldVector instances (allocated via LanceArrowUtils.toArrowField → Field.createVector), wrapped in `LanceArrowColumnVector`. This satisfies Spark's vectorTypes() contract which expects all data columns to be `LanceArrowColumnVector`. A dedicated child allocator (nullAllocator) manages these vectors and is closed before the data allocator. - Partition columns: pre-created `OnHeapColumnVector` arrays are filled with constant partition values via `populatePartitionVectors()`, which handles all Spark primitive types, strings, decimals, and binary. Vectors are reused across batches; re-populated only when batch size changes. - A mappedIterator (implementing Iterator[ColumnarBatch] with Closeable) assembles the final batch per iteration and is registered with TaskContext for cleanup. - Type-change fallback: when `implicitTypeChangeInfo` is non-empty (e.g., file has FLOAT, query requires DOUBLE), falls back to `readRows()` which applies cast projections at the row level. Batch-level type casting is deferred to a follow-up. - `readRows()`: the original row-based logic, extracted into its own method. Behavior unchanged. - `populatePartitionVectors()`: new private helper, supports Boolean, Byte, Short, Int/Date, Long/Timestamp, Float, Double, String, Decimal (int/long/big), Binary. Unsupported types fall back to nulls. `HoodieFileGroupReaderBasedFileFormat` — MODIFIED (hudi-spark-datasource/hudi-spark-common) - `supportBatch()`: changed `val lanceBatchSupported = false` to `val lanceBatchSupported = true`. The existing guards `supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch and supportReturningBatch = !isMOR && supportVectorizedRead` remain unchanged and apply to all formats. - `vectorTypes()`: added a branch for LANCE format (when not in multi-format mode) returning `LanceArrowColumnVector` class name for all data columns and `OnHeapColumnVector` for partition columns. The existing Parquet/ORC logic is wrapped in the else branch, unchanged. - Two fallback `readBaseFile` call sites: changed `baseFileReader.value` to `fileGroupBaseFileReader.value`. This fixes a `ClassCastException (ColumnarBatch cannot be cast to InternalRow)` that occurred on MOR tables and other paths that go through `HoodieFileGroupReader`, which expects row-based input. `baseFileReader` is now the vectorized reader (returns ColumnarBatch for COW), while `fileGroupBaseFileReader` is always non-vectorized (returns InternalRow). `TestLanceColumnarBatch` — NEW (hudi-spark-datasource/hudi-spark/src/test) - 5 unit tests invoking `SparkLanceReaderBase` directly: - `testRowPathReturnsInternalRows` — verifies enableVectorizedReader=false returns InternalRow, never ColumnarBatch - `testColumnarPathReturnsBatches` — verifies enableVectorizedReader=true returns ColumnarBatch with correct data - `testColumnarPathNullPadsAbsentColumns` — schema evolution: missing column is null-padded in batch mode - `testColumnarPathAppendsPartitionVectors` — partition values appended as constant columns to each batch - `testTypeChangeFallsBackToRowPath` — implicit type change (FLOAT→DOUBLE) forces row path with correct cast values - 3 integration tests via `Spark DataFrame/SQL API`: - `testCOWTableDataFrameRead` — COW round-trip with vectorized reads active - `testCOWTableSchemaEvolutionNullPadding` — two bulk_inserts with schema widening; old files null-padded - `testCOWTableSparkSqlQuery` — SELECT ... WHERE predicate evaluation on columnar batches ### Impact Public API / user-facing changes - No new configuration options. Vectorized reading activates automatically for Lance COW tables. There is no feature flag to toggle it. - No API changes. All changes are internal to the Spark datasource read path. The SparkLanceReaderBase constructor already accepted enableVectorizedReader: Boolean; the new batch path is selected when this is true. - Behavioral change: queries on Lance COW tables that previously returned InternalRow one-by-one now return ColumnarBatch. From the user's perspective, query results are identical; only the internal execution changes. Performance impact - Read throughput improvement for COW Lance tables. Eliminates per-row UnsafeProjection + .copy() overhead. Arrow vectors are wrapped zero-copy in LanceArrowColumnVector and passed directly to Spark's columnar execution engine. - No regression for MOR tables. MOR continues to use the row-based path through fileGroupBaseFileReader, which is explicitly non-vectorized. - Minimal memory overhead for schema evolution. Null-padding columns allocate a lightweight Arrow FieldVector (validity buffer only, no data buffer) via a dedicated child allocator. The allocator is released when the iterator is closed. ### Risk Level low ### Documentation Update none ### Contributor's checklist - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute) - [ ] Enough context is provided in the sections above - [ ] Adequate tests were added if applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
