yihua commented on code in PR #18403:
URL: https://github.com/apache/hudi/pull/18403#discussion_r3036282204
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -184,23 +183,31 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
override def vectorTypes(requiredSchema: StructType,
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]] = {
- val originalVectorTypes = super.vectorTypes(requiredSchema,
partitionSchema, sqlConf)
- if (mandatoryFields.isEmpty) {
- originalVectorTypes
+ if (hoodieFileFormat == HoodieFileFormat.LANCE &&
!isMultipleBaseFileFormatsEnabled) {
+ // Lance uses LanceArrowColumnVector for data columns and
OnHeapColumnVector for partition columns.
+ // Spark uses vectorTypes to determine if columnar batch reading is
supported.
+ val lanceVectorType =
"org.apache.spark.sql.vectorized.LanceArrowColumnVector"
+ val partitionVectorType = classOf[OnHeapColumnVector].getName
Review Comment:
🤖 nit: `val lanceVectorType =
"org.apache.spark.sql.vectorized.LanceArrowColumnVector"` is a hard-coded
string — if the class is ever moved it silently breaks. Could you use
`classOf[LanceArrowColumnVector].getName` here, consistent with how
`partitionVectorType` is defined just below?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -111,65 +113,284 @@ class SparkLanceReaderBase(enableVectorizedReader:
Boolean) extends SparkColumna
// Read data with column projection (filters not supported yet)
val arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE)
- // Create iterator using shared LanceRecordIterator
- lanceIterator = new LanceRecordIterator(
- allocator,
- lanceReader,
- arrowReader,
- requestSchema,
- filePath
- )
-
- // Register cleanup listener
- Option(TaskContext.get()).foreach { ctx =>
- ctx.addTaskCompletionListener[Unit](_ => lanceIterator.close())
- }
-
- // Create the following projections for schema evolution:
- // 1. Padding projection: add NULL for missing columns
- // 2. Casting projection: handle type conversions
- val schemaUtils = sparkAdapter.getSchemaUtils
- val paddingProj =
SparkSchemaTransformUtils.generateNullPaddingProjection(requestSchema,
requiredSchema)
- val castProj = SparkSchemaTransformUtils.generateUnsafeProjection(
- schemaUtils.toAttributes(requiredSchema),
- Some(SQLConf.get.sessionLocalTimeZone),
- implicitTypeChangeInfo,
- requiredSchema,
- new StructType(),
- schemaUtils
- )
-
- // Unify projections by applying padding and then casting for each row
- val projection: UnsafeProjection = new UnsafeProjection {
- def apply(row: InternalRow): UnsafeRow =
- castProj(paddingProj(row))
- }
- val projectedIter = lanceIterator.asScala.map(projection.apply)
-
- // Handle partition columns
- if (partitionSchema.length == 0) {
- // No partition columns - return rows directly
- projectedIter
+ // Decide between batch mode and row mode.
+ // Fall back to row mode if type casting is needed (batch-level type
casting deferred to follow-up).
+ val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+ if (enableVectorizedReader && !hasTypeChanges) {
+ readBatch(file, allocator, lanceReader, arrowReader, filePath,
+ requestSchema, requiredSchema, partitionSchema)
} else {
- // Create UnsafeProjection to convert JoinedRow to UnsafeRow
- val fullSchema = (requiredSchema.fields ++
partitionSchema.fields).map(f =>
- AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
- val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
-
- // Append partition values to each row using JoinedRow, then convert
to UnsafeRow
- val joinedRow = new JoinedRow()
- projectedIter.map(row => unsafeProjection(joinedRow(row,
file.partitionValues)))
+ readRows(file, allocator, lanceReader, arrowReader, filePath,
+ requestSchema, requiredSchema, partitionSchema,
implicitTypeChangeInfo)
}
} catch {
case e: Exception =>
- if (lanceIterator != null) {
- lanceIterator.close() // Close iterator which handles lifecycle
for all objects
+ allocator.close()
+ throw new IOException(s"Failed to read Lance file: $filePath", e)
+ }
+ }
+ }
+
+ /**
+ * Columnar batch reading path. Returns Iterator[ColumnarBatch] type-erased
as Iterator[InternalRow].
+ * Used when enableVectorizedReader=true and no type casting is needed.
+ */
+ private def readBatch(file: PartitionedFile,
+ allocator: org.apache.arrow.memory.BufferAllocator,
+ lanceReader: LanceFileReader,
+ arrowReader: ArrowReader,
+ filePath: String,
+ requestSchema: StructType,
+ requiredSchema: StructType,
+ partitionSchema: StructType): Iterator[InternalRow] = {
+
+ val batchIterator = new LanceBatchIterator(allocator, lanceReader,
arrowReader, filePath)
+
+ // Build column mapping: for each column in requiredSchema, find its index
in requestSchema (file columns)
+ // Returns -1 if the column is missing from the file (schema evolution:
column addition)
+ val columnMapping: Array[Int] = requiredSchema.fields.map { field =>
+ requestSchema.fieldNames.indexOf(field.name)
+ }
+
+ // Create Arrow-backed null vectors for columns missing from the file.
+ // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is
satisfied
+ // (FileSourceScanExec expects all data columns to be
LanceArrowColumnVector).
+ val nullAllocator = if (columnMapping.contains(-1)) {
+ HoodieArrowAllocator.newChildAllocator(
+ getClass.getSimpleName + "-null-" + filePath,
HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE)
+ } else null
+
+ val nullColumnVectors: Array[(Int, LanceArrowColumnVector,
org.apache.arrow.vector.FieldVector)] =
+ if (nullAllocator != null) {
+ columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) =>
+ val field = LanceArrowUtils.toArrowField(
+ requiredSchema(idx).name, requiredSchema(idx).dataType,
requiredSchema(idx).nullable, "UTC")
+ val arrowVector = field.createVector(nullAllocator)
+ arrowVector.allocateNew()
+ arrowVector.setValueCount(DEFAULT_BATCH_SIZE)
+ (idx, new LanceArrowColumnVector(arrowVector), arrowVector)
+ }
+ } else {
+ Array.empty
+ }
+
+ // Pre-create partition column vectors (reused across batches, reset per
batch)
+ val hasPartitionColumns = partitionSchema.length > 0
+ val partitionVectors: Array[WritableColumnVector] = if
(hasPartitionColumns) {
+ partitionSchema.fields.map(f => new
OnHeapColumnVector(DEFAULT_BATCH_SIZE, f.dataType))
+ } else {
+ Array.empty
Review Comment:
🤖 nit: the three-element tuple `(Int, LanceArrowColumnVector,
org.apache.arrow.vector.FieldVector)` accessed via `_._1`, `_._2`, `_._3` is a
bit opaque. A small case class like `NullColumnEntry(colIndex: Int,
columnVector: LanceArrowColumnVector, arrowVector: FieldVector)` — or even
keying by column index in a `Map` — would make the access sites read more
clearly.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -111,65 +113,284 @@ class SparkLanceReaderBase(enableVectorizedReader:
Boolean) extends SparkColumna
// Read data with column projection (filters not supported yet)
val arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE)
- // Create iterator using shared LanceRecordIterator
- lanceIterator = new LanceRecordIterator(
- allocator,
- lanceReader,
- arrowReader,
- requestSchema,
- filePath
- )
-
- // Register cleanup listener
- Option(TaskContext.get()).foreach { ctx =>
- ctx.addTaskCompletionListener[Unit](_ => lanceIterator.close())
- }
-
- // Create the following projections for schema evolution:
- // 1. Padding projection: add NULL for missing columns
- // 2. Casting projection: handle type conversions
- val schemaUtils = sparkAdapter.getSchemaUtils
- val paddingProj =
SparkSchemaTransformUtils.generateNullPaddingProjection(requestSchema,
requiredSchema)
- val castProj = SparkSchemaTransformUtils.generateUnsafeProjection(
- schemaUtils.toAttributes(requiredSchema),
- Some(SQLConf.get.sessionLocalTimeZone),
- implicitTypeChangeInfo,
- requiredSchema,
- new StructType(),
- schemaUtils
- )
-
- // Unify projections by applying padding and then casting for each row
- val projection: UnsafeProjection = new UnsafeProjection {
- def apply(row: InternalRow): UnsafeRow =
- castProj(paddingProj(row))
- }
- val projectedIter = lanceIterator.asScala.map(projection.apply)
-
- // Handle partition columns
- if (partitionSchema.length == 0) {
- // No partition columns - return rows directly
- projectedIter
+ // Decide between batch mode and row mode.
+ // Fall back to row mode if type casting is needed (batch-level type
casting deferred to follow-up).
+ val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+ if (enableVectorizedReader && !hasTypeChanges) {
+ readBatch(file, allocator, lanceReader, arrowReader, filePath,
+ requestSchema, requiredSchema, partitionSchema)
} else {
- // Create UnsafeProjection to convert JoinedRow to UnsafeRow
- val fullSchema = (requiredSchema.fields ++
partitionSchema.fields).map(f =>
- AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
- val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
-
- // Append partition values to each row using JoinedRow, then convert
to UnsafeRow
- val joinedRow = new JoinedRow()
- projectedIter.map(row => unsafeProjection(joinedRow(row,
file.partitionValues)))
+ readRows(file, allocator, lanceReader, arrowReader, filePath,
+ requestSchema, requiredSchema, partitionSchema,
implicitTypeChangeInfo)
}
} catch {
case e: Exception =>
- if (lanceIterator != null) {
- lanceIterator.close() // Close iterator which handles lifecycle
for all objects
+ allocator.close()
+ throw new IOException(s"Failed to read Lance file: $filePath", e)
+ }
+ }
+ }
+
+ /**
+ * Columnar batch reading path. Returns Iterator[ColumnarBatch] type-erased
as Iterator[InternalRow].
+ * Used when enableVectorizedReader=true and no type casting is needed.
+ */
+ private def readBatch(file: PartitionedFile,
+ allocator: org.apache.arrow.memory.BufferAllocator,
+ lanceReader: LanceFileReader,
+ arrowReader: ArrowReader,
+ filePath: String,
+ requestSchema: StructType,
+ requiredSchema: StructType,
+ partitionSchema: StructType): Iterator[InternalRow] = {
+
+ val batchIterator = new LanceBatchIterator(allocator, lanceReader,
arrowReader, filePath)
+
+ // Build column mapping: for each column in requiredSchema, find its index
in requestSchema (file columns)
+ // Returns -1 if the column is missing from the file (schema evolution:
column addition)
+ val columnMapping: Array[Int] = requiredSchema.fields.map { field =>
+ requestSchema.fieldNames.indexOf(field.name)
+ }
+
+ // Create Arrow-backed null vectors for columns missing from the file.
+ // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is
satisfied
+ // (FileSourceScanExec expects all data columns to be
LanceArrowColumnVector).
+ val nullAllocator = if (columnMapping.contains(-1)) {
+ HoodieArrowAllocator.newChildAllocator(
+ getClass.getSimpleName + "-null-" + filePath,
HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE)
+ } else null
+
+ val nullColumnVectors: Array[(Int, LanceArrowColumnVector,
org.apache.arrow.vector.FieldVector)] =
+ if (nullAllocator != null) {
+ columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) =>
+ val field = LanceArrowUtils.toArrowField(
+ requiredSchema(idx).name, requiredSchema(idx).dataType,
requiredSchema(idx).nullable, "UTC")
+ val arrowVector = field.createVector(nullAllocator)
+ arrowVector.allocateNew()
+ arrowVector.setValueCount(DEFAULT_BATCH_SIZE)
+ (idx, new LanceArrowColumnVector(arrowVector), arrowVector)
+ }
+ } else {
+ Array.empty
+ }
+
+ // Pre-create partition column vectors (reused across batches, reset per
batch)
+ val hasPartitionColumns = partitionSchema.length > 0
+ val partitionVectors: Array[WritableColumnVector] = if
(hasPartitionColumns) {
+ partitionSchema.fields.map(f => new
OnHeapColumnVector(DEFAULT_BATCH_SIZE, f.dataType))
+ } else {
+ Array.empty
+ }
+
+ // Populate partition vectors with constant values
+ var lastPopulatedNumRows = DEFAULT_BATCH_SIZE
+ if (hasPartitionColumns) {
+ populatePartitionVectors(partitionVectors, partitionSchema,
file.partitionValues, DEFAULT_BATCH_SIZE)
+ }
+
+ val totalColumns = requiredSchema.length + partitionSchema.length
+
+ // Map each source batch to a batch with the correct column layout.
+ val mappedIterator = new Iterator[ColumnarBatch] with Closeable {
+ override def hasNext: Boolean = batchIterator.hasNext()
+
+ override def next(): ColumnarBatch = {
+ val sourceBatch = batchIterator.next()
+ val numRows = sourceBatch.numRows()
+
+ val vectors = new Array[ColumnVector](totalColumns)
+
+ // Data columns: reorder from source batch or substitute null Arrow
vector
+ var i = 0
+ while (i < requiredSchema.length) {
+ if (columnMapping(i) >= 0) {
+ vectors(i) = sourceBatch.column(columnMapping(i))
} else {
- allocator.close() // Close allocator directly
+ // Find the pre-created null vector for this index
+ val entry = nullColumnVectors.find(_._1 == i).get
+ // Adjust valueCount if batch size differs from allocated size
+ if (numRows != entry._3.getValueCount) {
+ entry._3.setValueCount(numRows)
+ }
+ vectors(i) = entry._2
Review Comment:
🤖 nit: `nullColumnVectors.find(_._1 == i).get` will throw a bare
`NoSuchElementException` if the invariant is ever violated. Could you swap
`.get` for `.getOrElse(throw new IllegalStateException(s"No null vector
pre-created for column index $i"))` to give a meaningful diagnostic?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala:
##########
@@ -111,65 +113,284 @@ class SparkLanceReaderBase(enableVectorizedReader:
Boolean) extends SparkColumna
// Read data with column projection (filters not supported yet)
val arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE)
- // Create iterator using shared LanceRecordIterator
- lanceIterator = new LanceRecordIterator(
- allocator,
- lanceReader,
- arrowReader,
- requestSchema,
- filePath
- )
-
- // Register cleanup listener
- Option(TaskContext.get()).foreach { ctx =>
- ctx.addTaskCompletionListener[Unit](_ => lanceIterator.close())
- }
-
- // Create the following projections for schema evolution:
- // 1. Padding projection: add NULL for missing columns
- // 2. Casting projection: handle type conversions
- val schemaUtils = sparkAdapter.getSchemaUtils
- val paddingProj =
SparkSchemaTransformUtils.generateNullPaddingProjection(requestSchema,
requiredSchema)
- val castProj = SparkSchemaTransformUtils.generateUnsafeProjection(
- schemaUtils.toAttributes(requiredSchema),
- Some(SQLConf.get.sessionLocalTimeZone),
- implicitTypeChangeInfo,
- requiredSchema,
- new StructType(),
- schemaUtils
- )
-
- // Unify projections by applying padding and then casting for each row
- val projection: UnsafeProjection = new UnsafeProjection {
- def apply(row: InternalRow): UnsafeRow =
- castProj(paddingProj(row))
- }
- val projectedIter = lanceIterator.asScala.map(projection.apply)
-
- // Handle partition columns
- if (partitionSchema.length == 0) {
- // No partition columns - return rows directly
- projectedIter
+ // Decide between batch mode and row mode.
+ // Fall back to row mode if type casting is needed (batch-level type
casting deferred to follow-up).
+ val hasTypeChanges = !implicitTypeChangeInfo.isEmpty
+ if (enableVectorizedReader && !hasTypeChanges) {
+ readBatch(file, allocator, lanceReader, arrowReader, filePath,
+ requestSchema, requiredSchema, partitionSchema)
} else {
- // Create UnsafeProjection to convert JoinedRow to UnsafeRow
- val fullSchema = (requiredSchema.fields ++
partitionSchema.fields).map(f =>
- AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
- val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
-
- // Append partition values to each row using JoinedRow, then convert
to UnsafeRow
- val joinedRow = new JoinedRow()
- projectedIter.map(row => unsafeProjection(joinedRow(row,
file.partitionValues)))
+ readRows(file, allocator, lanceReader, arrowReader, filePath,
+ requestSchema, requiredSchema, partitionSchema,
implicitTypeChangeInfo)
}
} catch {
case e: Exception =>
- if (lanceIterator != null) {
- lanceIterator.close() // Close iterator which handles lifecycle
for all objects
+ allocator.close()
+ throw new IOException(s"Failed to read Lance file: $filePath", e)
+ }
+ }
+ }
+
+ /**
+ * Columnar batch reading path. Returns Iterator[ColumnarBatch] type-erased
as Iterator[InternalRow].
+ * Used when enableVectorizedReader=true and no type casting is needed.
+ */
+ private def readBatch(file: PartitionedFile,
+ allocator: org.apache.arrow.memory.BufferAllocator,
+ lanceReader: LanceFileReader,
+ arrowReader: ArrowReader,
+ filePath: String,
+ requestSchema: StructType,
+ requiredSchema: StructType,
+ partitionSchema: StructType): Iterator[InternalRow] = {
+
+ val batchIterator = new LanceBatchIterator(allocator, lanceReader,
arrowReader, filePath)
+
+ // Build column mapping: for each column in requiredSchema, find its index
in requestSchema (file columns)
+ // Returns -1 if the column is missing from the file (schema evolution:
column addition)
+ val columnMapping: Array[Int] = requiredSchema.fields.map { field =>
+ requestSchema.fieldNames.indexOf(field.name)
+ }
+
+ // Create Arrow-backed null vectors for columns missing from the file.
+ // Uses LanceArrowColumnVector so that Spark's vectorTypes() contract is
satisfied
+ // (FileSourceScanExec expects all data columns to be
LanceArrowColumnVector).
+ val nullAllocator = if (columnMapping.contains(-1)) {
+ HoodieArrowAllocator.newChildAllocator(
+ getClass.getSimpleName + "-null-" + filePath,
HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE)
+ } else null
+
+ val nullColumnVectors: Array[(Int, LanceArrowColumnVector,
org.apache.arrow.vector.FieldVector)] =
+ if (nullAllocator != null) {
+ columnMapping.zipWithIndex.filter(_._1 < 0).map { case (_, idx) =>
+ val field = LanceArrowUtils.toArrowField(
+ requiredSchema(idx).name, requiredSchema(idx).dataType,
requiredSchema(idx).nullable, "UTC")
+ val arrowVector = field.createVector(nullAllocator)
+ arrowVector.allocateNew()
+ arrowVector.setValueCount(DEFAULT_BATCH_SIZE)
+ (idx, new LanceArrowColumnVector(arrowVector), arrowVector)
+ }
+ } else {
+ Array.empty
+ }
+
+ // Pre-create partition column vectors (reused across batches, reset per
batch)
Review Comment:
🤖 nit: assigning `null` to a `val` in Scala (`else null`) and then checking
`if (nullAllocator != null)` downstream reads as Java-style.
`Option[BufferAllocator]` would express "possibly absent allocator" more
naturally and let you use `.foreach(_.close())` instead of the null guards.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceBatchIterator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
+import org.lance.file.LanceFileReader;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator that returns {@link ColumnarBatch} directly from Lance files
without
+ * decomposing to individual rows. Used for vectorized/columnar batch reading
+ * in Spark's COW base-file-only read path.
+ *
+ * <p>Unlike {@link LanceRecordIterator} which extracts rows one by one,
+ * this iterator preserves the columnar format for zero-copy batch processing.
+ *
+ * <p>Manages the lifecycle of:
+ * <ul>
+ * <li>BufferAllocator - Arrow memory management</li>
+ * <li>LanceFileReader - Lance file handle</li>
+ * <li>ArrowReader - Arrow batch reader</li>
+ * </ul>
+ */
+public class LanceBatchIterator implements Iterator<ColumnarBatch>, Closeable {
+ private final BufferAllocator allocator;
+ private final LanceFileReader lanceReader;
+ private final ArrowReader arrowReader;
+ private final String path;
+
+ private ColumnVector[] columnVectors;
+ private ColumnarBatch currentBatch;
+ private boolean nextBatchLoaded = false;
+ private boolean finished = false;
+ private boolean closed = false;
+
+ /**
+ * Creates a new Lance batch iterator.
+ *
+ * @param allocator Arrow buffer allocator for memory management
+ * @param lanceReader Lance file reader
+ * @param arrowReader Arrow reader for batch reading
+ * @param path File path (for error messages)
+ */
+ public LanceBatchIterator(BufferAllocator allocator,
+ LanceFileReader lanceReader,
+ ArrowReader arrowReader,
+ String path) {
+ this.allocator = allocator;
+ this.lanceReader = lanceReader;
+ this.arrowReader = arrowReader;
+ this.path = path;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (finished) {
+ return false;
+ }
+ if (nextBatchLoaded) {
+ return true;
+ }
+
+ try {
+ if (arrowReader.loadNextBatch()) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Create column vector wrappers once and reuse across batches
+ // (ArrowReader reuses the same VectorSchemaRoot)
+ if (columnVectors == null) {
+ columnVectors = root.getFieldVectors().stream()
+ .map(LanceArrowColumnVector::new)
+ .toArray(ColumnVector[]::new);
+ }
+
+ currentBatch = new ColumnarBatch(columnVectors, root.getRowCount());
+ nextBatchLoaded = true;
+ return true;
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to read next batch from Lance file: "
+ path, e);
+ }
+
+ finished = true;
+ return false;
+ }
+
+ @Override
+ public ColumnarBatch next() {
Review Comment:
🤖 nit: `arrowReader`, `lanceReader`, and `allocator` are all `final` fields
set in the constructor, so they can never be `null` unless the caller
explicitly passes `null`. The `!= null` guards here add noise — could you
either drop them and trust the non-null contract, or enforce it upfront with
`Objects.requireNonNull` in the constructor?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]