the-other-tim-brown commented on code in PR #17660: URL: https://github.com/apache/hudi/pull/17660#discussion_r2649890213
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieLanceRecordIterator.java: ########## @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage; + +import com.lancedb.lance.file.LanceFileReader; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.sql.vectorized.LanceArrowColumnVector; + +import java.io.IOException; +import java.util.Iterator; + +/** + * Shared iterator implementation for reading Lance files and converting Arrow batches to Spark rows. + * This iterator is used by both Hudi's internal Lance reader and Spark datasource integration. + * + * <p>The iterator manages the lifecycle of: + * <ul> + * <li>BufferAllocator - Arrow memory management</li> + * <li>LanceFileReader - Lance file handle</li> + * <li>ArrowReader - Arrow batch reader</li> + * <li>ColumnarBatch - Current batch being iterated</li> + * </ul> + * + * <p>Records are converted to {@link UnsafeRow} using {@link UnsafeProjection} for efficient + * serialization and memory management. + */ +public class HoodieLanceRecordIterator implements ClosableIterator<UnsafeRow> { + private final BufferAllocator allocator; + private final LanceFileReader lanceReader; + private final ArrowReader arrowReader; + private final UnsafeProjection projection; + private final String path; + + private ColumnarBatch currentBatch; + private Iterator<InternalRow> rowIterator; + private ColumnVector[] columnVectors; + private boolean closed = false; + + /** + * Creates a new Lance record iterator. + * + * @param allocator Arrow buffer allocator for memory management + * @param lanceReader Lance file reader + * @param arrowReader Arrow reader for batch reading + * @param schema Spark schema for the records + * @param path File path (for error messages) + */ + public HoodieLanceRecordIterator(BufferAllocator allocator, + LanceFileReader lanceReader, + ArrowReader arrowReader, + StructType schema, + String path) { + this.allocator = allocator; + this.lanceReader = lanceReader; + this.arrowReader = arrowReader; + this.projection = UnsafeProjection.create(schema); + this.path = path; + } + + @Override + public boolean hasNext() { + // If we have records in current batch, return true + if (rowIterator != null && rowIterator.hasNext()) { + return true; + } + + // Close previous batch before loading next + if (currentBatch != null) { + currentBatch.close(); + currentBatch = null; + } + + // Try to load next batch + try { + if (arrowReader.loadNextBatch()) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + + // Wrap each Arrow FieldVector in LanceArrowColumnVector for type-safe access + // Cache the column wrappers on first batch and reuse for all subsequent batches + if (columnVectors == null) { + columnVectors = root.getFieldVectors().stream() + .map(LanceArrowColumnVector::new) + .toArray(ColumnVector[]::new); + } + + // Create ColumnarBatch and keep it alive while iterating + currentBatch = new ColumnarBatch(columnVectors, root.getRowCount()); + rowIterator = currentBatch.rowIterator(); + return rowIterator.hasNext(); + } + } catch (IOException e) { + throw new HoodieException("Failed to read next batch from Lance file: " + path, e); + } + + return false; + } + + @Override + public UnsafeRow next() { + if (!hasNext()) { + throw new IllegalStateException("No more records available"); + } + InternalRow row = rowIterator.next(); + // Convert to UnsafeRow immediately while batch is still open + return projection.apply(row); + } + + @Override + public void close() { + // Make close() idempotent - safe to call multiple times + if (closed) { + return; + } + closed = true; + + IOException arrowException = null; + Exception lanceException = null; + + // Close current batch if exists + if (currentBatch != null) { + currentBatch.close(); + currentBatch = null; + } + + // Close Arrow reader + if (arrowReader != null) { + try { + arrowReader.close(); + } catch (IOException e) { + arrowException = e; + } + } + + // Close Lance reader + if (lanceReader != null) { + try { + lanceReader.close(); + } catch (Exception e) { + lanceException = e; + } + } + + // Always close allocator + if (allocator != null) { + allocator.close(); + } + + // Throw any exceptions that occurred + if (arrowException != null) { + throw new HoodieIOException("Failed to close Arrow reader", arrowException); + } + if (lanceException != null) { + throw new HoodieException("Failed to close Lance reader", lanceException); + } + } +} Review Comment: Make sure to add a newline at the end of the file. I recommend setting this up with your editor so it does not get surfaced in each review. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala: ########## @@ -133,13 +133,17 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, val orcBatchSupported = conf.orcVectorizedReaderEnabled && schema.forall(s => OrcUtils.supportColumnarReads( s.dataType, sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled)) + // TODO: Implement columnar batch reading for Lance - currently using row-based reading Review Comment: Make sure to reference the ticket in the comment for any TODOs ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.lance + +import org.apache.hudi.common.util +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.io.memory.HoodieArrowAllocator +import org.apache.hudi.io.storage.{HoodieLanceRecordIterator, HoodieSparkLanceReader} +import org.apache.hudi.storage.StorageConfiguration + +import com.lancedb.lance.file.LanceFileReader +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.MessageType +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.LanceArrowUtils + +import java.io.IOException + +import scala.collection.JavaConverters._ + +/** + * Reader for Lance files in Spark datasource. + * Implements vectorized reading using LanceArrowColumnVector. + * + * @param enableVectorizedReader whether to use vectorized reading (currently always true for Lance) + */ +class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumnarFileReader { + + // Batch size for reading Lance files (number of rows per batch) + private val DEFAULT_BATCH_SIZE = 512 + + /** + * Read a Lance file with schema projection and partition column support. + * + * @param file Lance file to read + * @param requiredSchema desired output schema of the data (columns to read) + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param internalSchemaOpt option of internal schema for schema.on.read (not currently used for Lance) + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param storageConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + override def read(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + internalSchemaOpt: util.Option[InternalSchema], + filters: Seq[Filter], + storageConf: StorageConfiguration[Configuration], + tableSchemaOpt: util.Option[MessageType] = util.Option.empty()): Iterator[InternalRow] = { + + val filePath = file.filePath.toString + + if (requiredSchema.isEmpty && partitionSchema.isEmpty) { + // No columns requested - return empty iterator + Iterator.empty + } else { + // Create child allocator for reading + val allocator = HoodieArrowAllocator.newChildAllocator(getClass.getSimpleName + "-data-" + filePath, + HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE); + + try { + // Open Lance file reader + val lanceReader = LanceFileReader.open(filePath, allocator) + + // Extract column names from required schema for projection + val columnNames: java.util.List[String] = if (requiredSchema.nonEmpty) { + requiredSchema.fields.map(_.name).toList.asJava + } else { + // If only partition columns requested, read minimal data + null + } + + // Get schema from Lance file for HoodieLanceRecordIterator + val arrowSchema = lanceReader.schema() + val sparkSchema = LanceArrowUtils.fromArrowSchema(arrowSchema) + + // Read data with column projection (filters not supported yet) + val arrowReader = lanceReader.readAll(columnNames, null, DEFAULT_BATCH_SIZE) + + // Create iterator using shared HoodieLanceRecordIterator + val lanceIterator = new HoodieLanceRecordIterator( Review Comment: Does the iterator apply any required field schema evolution? ########## hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java: ########## @@ -49,7 +49,11 @@ public enum HoodieFileFormat { + "way to store Hive data. It was designed to overcome limitations of the other Hive file " + "formats. Using ORC files improves performance when Hive is reading, writing, and " + "processing data.") - ORC(".orc"); + ORC(".orc"), + + @EnumFieldDescription("Lance is a modern columnar data format optimized for ML and AI workloads. " + + "It provides efficient random access, versioning, and integration with Apache Arrow.") Review Comment: What does `versioning` mean in this context? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala: ########## @@ -424,8 +430,14 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, override def inferSchema(sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { Review Comment: @nsivabalan or @yihua, should the `inferSchema` simply rely on the table schema instead of looking at the data files? ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.lance + +import org.apache.hudi.common.util +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.io.memory.HoodieArrowAllocator +import org.apache.hudi.io.storage.{HoodieLanceRecordIterator, HoodieSparkLanceReader} +import org.apache.hudi.storage.StorageConfiguration + +import com.lancedb.lance.file.LanceFileReader +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.schema.MessageType +import org.apache.spark.TaskContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumnarFileReader} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.LanceArrowUtils + +import java.io.IOException + +import scala.collection.JavaConverters._ + +/** + * Reader for Lance files in Spark datasource. + * Implements vectorized reading using LanceArrowColumnVector. + * + * @param enableVectorizedReader whether to use vectorized reading (currently always true for Lance) + */ +class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends SparkColumnarFileReader { + + // Batch size for reading Lance files (number of rows per batch) + private val DEFAULT_BATCH_SIZE = 512 + + /** + * Read a Lance file with schema projection and partition column support. + * + * @param file Lance file to read + * @param requiredSchema desired output schema of the data (columns to read) + * @param partitionSchema schema of the partition columns. Partition values will be appended to the end of every row + * @param internalSchemaOpt option of internal schema for schema.on.read (not currently used for Lance) + * @param filters filters for data skipping. Not guaranteed to be used; the spark plan will also apply the filters. + * @param storageConf the hadoop conf + * @return iterator of rows read from the file output type says [[InternalRow]] but could be [[ColumnarBatch]] + */ + override def read(file: PartitionedFile, + requiredSchema: StructType, + partitionSchema: StructType, + internalSchemaOpt: util.Option[InternalSchema], + filters: Seq[Filter], + storageConf: StorageConfiguration[Configuration], + tableSchemaOpt: util.Option[MessageType] = util.Option.empty()): Iterator[InternalRow] = { + + val filePath = file.filePath.toString + + if (requiredSchema.isEmpty && partitionSchema.isEmpty) { + // No columns requested - return empty iterator + Iterator.empty + } else { + // Create child allocator for reading + val allocator = HoodieArrowAllocator.newChildAllocator(getClass.getSimpleName + "-data-" + filePath, + HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE); + + try { + // Open Lance file reader + val lanceReader = LanceFileReader.open(filePath, allocator) + + // Extract column names from required schema for projection + val columnNames: java.util.List[String] = if (requiredSchema.nonEmpty) { + requiredSchema.fields.map(_.name).toList.asJava + } else { + // If only partition columns requested, read minimal data + null + } + + // Get schema from Lance file for HoodieLanceRecordIterator + val arrowSchema = lanceReader.schema() + val sparkSchema = LanceArrowUtils.fromArrowSchema(arrowSchema) Review Comment: Should these be lazily computed? Seems like they are only required when requiredSchema is empty ########## hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala: ########## @@ -154,6 +154,13 @@ class Spark3_4Adapter extends BaseSpark3Adapter { Spark34OrcReader.build(vectorized, sqlConf, options, hadoopConf, dataSchema) } + override def createLanceFileReader(vectorized: Boolean, + sqlConf: SQLConf, + options: Map[String, String], + hadoopConf: Configuration): SparkColumnarFileReader = { + throw new UnsupportedOperationException("Lance format is not supported in Spark 3.4") Review Comment: Why can't we support lance with these lower spark versions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
