This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a74d00a1b673 feat: Implement SparkColumnarFileReader for Datasource 
integration with Lance (#17660)
a74d00a1b673 is described below

commit a74d00a1b6734683461a8768701eaf5dc4a817b0
Author: Rahil C <[email protected]>
AuthorDate: Tue Dec 30 09:35:10 2025 -0500

    feat: Implement SparkColumnarFileReader for Datasource integration with 
Lance (#17660)
    
    * feat: Add HoodieBaseLanceFileWriter and implementation for SparkFileWriter
    
    * feat: Add HoodieSparkLanceReader for reading lance files to internal row
    
    * migrate to hoodie schema and address tim prev comments
    
    * Implement SparkColumnarFileReader for Datasource integration with Lance
    
    * fix usages to hoodie schema
    
    * fix iterator for reuse
    
    * minor fixes
    
    * add DisabledIfSystemProperty
    
    * try spark 4.0
    
    * scala style plugin property change
    
    * intial minor comments
    
    * add spark 3.4
    
    * address comments
    
    * address ethan tim comments
    
    * scalastyle
---
 .../io/storage/HoodieSparkFileWriterFactory.java   |  18 +-
 .../hudi/io/storage/HoodieSparkLanceReader.java    | 119 +-------------
 .../hudi/io/storage/LanceRecordIterator.java       | 183 +++++++++++++++++++++
 .../hudi/BaseSparkInternalRecordContext.java       |   2 +-
 .../apache/spark/sql/HoodieInternalRowUtils.scala  |   9 +-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |  14 ++
 .../apache/hudi/common/model/HoodieFileFormat.java |   6 +-
 .../hudi/io/storage/HoodieFileWriterFactory.java   |  10 ++
 .../datasources/lance/SparkLanceReaderBase.scala   | 140 ++++++++++++++++
 .../HoodieFileGroupReaderBasedFileFormat.scala     |   6 +
 .../hudi/functional/TestLanceDataSource.scala      | 132 +++++++++++++++
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |   7 +
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |   8 +
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |   8 +
 .../apache/spark/sql/adapter/Spark4_0Adapter.scala |   8 +
 pom.xml                                            |   4 +-
 16 files changed, 544 insertions(+), 130 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index 9ba8e711946d..4b5264ab759f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -32,7 +32,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -59,7 +58,7 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
       compressionCodecName = null;
     }
     //TODO boundary to revisit in follow up to use HoodieSchema directly
-    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(storage.getConf(), schema.getAvroSchema(),
+    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(storage.getConf(), schema,
         config, enableBloomFilter(populateMetaFields, config));
     HoodieRowParquetConfig parquetConfig = new 
HoodieRowParquetConfig(writeSupport,
         CompressionCodecName.fromConf(compressionCodecName),
@@ -77,7 +76,7 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
   protected HoodieFileWriter newParquetFileWriter(OutputStream outputStream, 
HoodieConfig config,
                                                   HoodieSchema schema) throws 
IOException {
     boolean enableBloomFilter = false;
-    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(storage.getConf(), schema.getAvroSchema(), 
config, enableBloomFilter);
+    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(storage.getConf(), schema, config, 
enableBloomFilter);
     String compressionCodecName = 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
     // Support PARQUET_COMPRESSION_CODEC_NAME is ""
     if (compressionCodecName.isEmpty()) {
@@ -106,10 +105,19 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
     throw new HoodieIOException("Not support write to Orc file");
   }
 
-  private static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, Schema schema,
+  @Override
+  protected HoodieFileWriter newLanceFileWriter(String instantTime, 
StoragePath path, HoodieConfig config, HoodieSchema schema,
+                                                TaskContextSupplier 
taskContextSupplier) throws IOException {
+    boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
+
+    return new HoodieSparkLanceWriter(path, structType, instantTime, 
taskContextSupplier, storage, populateMetaFields);
+  }
+
+  private static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema 
schema,
                                                                               
HoodieConfig config, boolean enableBloomFilter) {
     Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
-    StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
     return 
HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(conf.unwrapAs(Configuration.class),
 structType, filter, config);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index ff35bac92f91..90e4996e44a4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io.storage;
 
 import com.lancedb.lance.file.LanceFileReader;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.hudi.HoodieSchemaConversionUtils;
@@ -37,19 +36,14 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.LanceArrowUtils;
-import org.apache.spark.sql.vectorized.ColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
@@ -155,7 +149,7 @@ public class HoodieSparkLanceReader implements 
HoodieSparkFileReader {
       // Read only the requested columns from Lance file for efficiency
       ArrowReader arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE);
 
-      return new LanceRecordIterator(allocator, lanceReader, arrowReader, 
requestedSparkSchema);
+      return new LanceRecordIterator(allocator, lanceReader, arrowReader, 
requestedSparkSchema, path.toString());
     } catch (Exception e) {
       allocator.close();
       throw new HoodieException("Failed to create Lance reader for: " + path, 
e);
@@ -195,113 +189,4 @@ public class HoodieSparkLanceReader implements 
HoodieSparkFileReader {
    * Iterator implementation that reads Lance file batches and converts to 
UnsafeRows.
    * Keeps ColumnarBatch alive while iterating to avoid unnecessary data 
copying.
    */
-  private class LanceRecordIterator implements ClosableIterator<UnsafeRow> {
-    private final BufferAllocator allocator;
-    private final LanceFileReader lanceReader;
-    private final ArrowReader arrowReader;
-    private final UnsafeProjection projection;
-    private ColumnarBatch currentBatch;
-    private Iterator<InternalRow> rowIterator;
-    private ColumnVector[] columnVectors;
-
-    public LanceRecordIterator(BufferAllocator allocator,
-                               LanceFileReader lanceReader,
-                               ArrowReader arrowReader,
-                               StructType schema) {
-      this.allocator = allocator;
-      this.lanceReader = lanceReader;
-      this.arrowReader = arrowReader;
-      this.projection = UnsafeProjection.create(schema);
-    }
-
-    @Override
-    public boolean hasNext() {
-      // If we have records in current batch, return true
-      if (rowIterator != null && rowIterator.hasNext()) {
-        return true;
-      }
-
-      // Close previous batch before loading next
-      if (currentBatch != null) {
-        currentBatch.close();
-        currentBatch = null;
-      }
-
-      // Try to load next batch
-      try {
-        if (arrowReader.loadNextBatch()) {
-          VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
-
-          // Wrap each Arrow FieldVector in LanceArrowColumnVector for 
type-safe access
-          // Cache the column wrappers on first batch and reuse for all 
subsequent batches
-          if (columnVectors == null) {
-            columnVectors = root.getFieldVectors().stream()
-                    .map(LanceArrowColumnVector::new)
-                    .toArray(ColumnVector[]::new);
-          }
-
-          // Create ColumnarBatch and keep it alive while iterating
-          currentBatch = new ColumnarBatch(columnVectors, root.getRowCount());
-          rowIterator = currentBatch.rowIterator();
-          return rowIterator.hasNext();
-        }
-      } catch (IOException e) {
-        throw new HoodieException("Failed to read next batch from Lance file: 
" + path, e);
-      }
-
-      return false;
-    }
-
-    @Override
-    public UnsafeRow next() {
-      if (!hasNext()) {
-        throw new IllegalStateException("No more records available");
-      }
-      InternalRow row = rowIterator.next();
-      // Convert to UnsafeRow immediately while batch is still open
-      return projection.apply(row);
-    }
-
-    @Override
-    public void close() {
-      IOException arrowException = null;
-      Exception lanceException = null;
-
-      // Close current batch if exists
-      if (currentBatch != null) {
-        currentBatch.close();
-      }
-
-      // Close Arrow reader
-      if (arrowReader != null) {
-        try {
-          arrowReader.close();
-        } catch (IOException e) {
-          arrowException = e;
-        }
-      }
-
-      // Close Lance reader
-      if (lanceReader != null) {
-        try {
-          lanceReader.close();
-        } catch (Exception e) {
-          lanceException = e;
-        }
-      }
-
-      // Always close allocator
-      if (allocator != null) {
-        allocator.close();
-      }
-
-      // Throw any exceptions that occurred
-      if (arrowException != null) {
-        throw new HoodieIOException("Failed to close Arrow reader", 
arrowException);
-      }
-      if (lanceException != null) {
-        throw new HoodieException("Failed to close Lance reader", 
lanceException);
-      }
-    }
-  }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
new file mode 100644
index 000000000000..88d5645879a6
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Shared iterator implementation for reading Lance files and converting Arrow 
batches to Spark rows.
+ * This iterator is used by both Hudi's internal Lance reader and Spark 
datasource integration.
+ *
+ * <p>The iterator manages the lifecycle of:
+ * <ul>
+ *   <li>BufferAllocator - Arrow memory management</li>
+ *   <li>LanceFileReader - Lance file handle</li>
+ *   <li>ArrowReader - Arrow batch reader</li>
+ *   <li>ColumnarBatch - Current batch being iterated</li>
+ * </ul>
+ *
+ * <p>Records are converted to {@link UnsafeRow} using {@link 
UnsafeProjection} for efficient
+ * serialization and memory management.
+ */
+public class LanceRecordIterator implements ClosableIterator<UnsafeRow> {
+  private final BufferAllocator allocator;
+  private final LanceFileReader lanceReader;
+  private final ArrowReader arrowReader;
+  private final UnsafeProjection projection;
+  private final String path;
+
+  private ColumnarBatch currentBatch;
+  private Iterator<InternalRow> rowIterator;
+  private ColumnVector[] columnVectors;
+  private boolean closed = false;
+
+  /**
+   * Creates a new Lance record iterator.
+   *
+   * @param allocator Arrow buffer allocator for memory management
+   * @param lanceReader Lance file reader
+   * @param arrowReader Arrow reader for batch reading
+   * @param schema Spark schema for the records
+   * @param path File path (for error messages)
+   */
+  public LanceRecordIterator(BufferAllocator allocator,
+                             LanceFileReader lanceReader,
+                             ArrowReader arrowReader,
+                             StructType schema,
+                             String path) {
+    this.allocator = allocator;
+    this.lanceReader = lanceReader;
+    this.arrowReader = arrowReader;
+    this.projection = UnsafeProjection.create(schema);
+    this.path = path;
+  }
+
+  @Override
+  public boolean hasNext() {
+    // If we have records in current batch, return true
+    if (rowIterator != null && rowIterator.hasNext()) {
+      return true;
+    }
+
+    // Close previous batch before loading next
+    if (currentBatch != null) {
+      currentBatch.close();
+      currentBatch = null;
+    }
+
+    // Try to load next batch
+    try {
+      if (arrowReader.loadNextBatch()) {
+        VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+        // Wrap each Arrow FieldVector in LanceArrowColumnVector for type-safe 
access
+        // Cache the column wrappers on first batch and reuse for all 
subsequent batches
+        if (columnVectors == null) {
+          columnVectors = root.getFieldVectors().stream()
+                  .map(LanceArrowColumnVector::new)
+                  .toArray(ColumnVector[]::new);
+        }
+
+        // Create ColumnarBatch and keep it alive while iterating
+        currentBatch = new ColumnarBatch(columnVectors, root.getRowCount());
+        rowIterator = currentBatch.rowIterator();
+        return rowIterator.hasNext();
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to read next batch from Lance file: " 
+ path, e);
+    }
+
+    return false;
+  }
+
+  @Override
+  public UnsafeRow next() {
+    if (!hasNext()) {
+      throw new IllegalStateException("No more records available");
+    }
+    InternalRow row = rowIterator.next();
+    // Convert to UnsafeRow immediately while batch is still open
+    return projection.apply(row);
+  }
+
+  @Override
+  public void close() {
+    // Make close() idempotent - safe to call multiple times
+    if (closed) {
+      return;
+    }
+    closed = true;
+
+    IOException arrowException = null;
+    Exception lanceException = null;
+
+    // Close current batch if exists
+    if (currentBatch != null) {
+      currentBatch.close();
+      currentBatch = null;
+    }
+
+    // Close Arrow reader
+    if (arrowReader != null) {
+      try {
+        arrowReader.close();
+      } catch (IOException e) {
+        arrowException = e;
+      }
+    }
+
+    // Close Lance reader
+    if (lanceReader != null) {
+      try {
+        lanceReader.close();
+      } catch (Exception e) {
+        lanceException = e;
+      }
+    }
+
+    // Always close allocator
+    if (allocator != null) {
+      allocator.close();
+    }
+
+    // Throw any exceptions that occurred
+    if (arrowException != null) {
+      throw new HoodieIOException("Failed to close Arrow reader", 
arrowException);
+    }
+    if (lanceException != null) {
+      throw new HoodieException("Failed to close Lance reader", 
lanceException);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
index 449e8f289a01..1af96a2bf335 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
@@ -216,7 +216,7 @@ public abstract class BaseSparkInternalRecordContext 
extends RecordContext<Inter
     if (internalRow instanceof UnsafeRow) {
       return internalRow;
     }
-    final UnsafeProjection unsafeProjection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(schema.toAvroSchema());
+    final UnsafeProjection unsafeProjection = 
HoodieInternalRowUtils.getCachedUnsafeProjection(schema);
     return unsafeProjection.apply(internalRow);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index c458fece591b..e2b782a424a8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -18,11 +18,11 @@
 
 package org.apache.spark.sql
 
+import org.apache.avro.Schema
 import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
-
-import org.apache.avro.Schema
+import org.apache.hudi.common.schema.HoodieSchema
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, 
composeNestedFieldPath}
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData, UnsafeProjection, UnsafeRow}
@@ -35,7 +35,6 @@ import org.apache.spark.unsafe.types.UTF8String
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.{Supplier, Function => JFunction}
 import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections, 
Deque => JDeque, Map => JMap}
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -84,9 +83,9 @@ object HoodieInternalRowUtils {
   /**
    * Provides cached instance of [[UnsafeProjection]] to project Java object 
based [[InternalRow]] to [[UnsafeRow]].
    */
-  def getCachedUnsafeProjection(schema: Schema): UnsafeProjection = {
+  def getCachedUnsafeProjection(schema: HoodieSchema): UnsafeProjection = {
     identicalUnsafeProjectionThreadLocal.get()
-      .getOrElseUpdate(schema, 
UnsafeProjection.create(getCachedSchema(schema)))
+      .getOrElseUpdate(schema.getAvroSchema, 
UnsafeProjection.create(getCachedSchema(schema.getAvroSchema)))
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 5d47c6b08dae..af45ebc53444 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -215,6 +215,20 @@ trait SparkAdapter extends Serializable {
                           hadoopConf: Configuration,
                           dataSchema: StructType): SparkColumnarFileReader
 
+  /**
+   * Get Lance file reader
+   *
+   * @param vectorized true if vectorized reading is not prohibited due to 
schema, reading mode, etc
+   * @param sqlConf    the [[SQLConf]] used for the read
+   * @param options    passed as a param to the file format
+   * @param hadoopConf some configs will be set for the hadoopConf
+   * @return Lance file reader
+   */
+  def createLanceFileReader(vectorized: Boolean,
+                            sqlConf: SQLConf,
+                            options: Map[String, String],
+                            hadoopConf: Configuration): SparkColumnarFileReader
+
   /**
    * use new qe execute
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index 6e20c2dfeaf9..a4913ae55c5d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -49,7 +49,11 @@ public enum HoodieFileFormat {
       + "way to store Hive data. It was designed to overcome limitations of 
the other Hive file "
       + "formats. Using ORC files improves performance when Hive is reading, 
writing, and "
       + "processing data.")
-  ORC(".orc");
+  ORC(".orc"),
+
+  @EnumFieldDescription("Lance is a modern columnar data format optimized for 
random access patterns "
+          + "and designed for ML and AI workloads")
+  LANCE(".lance");
 
   public static final Set<String> BASE_FILE_EXTENSIONS = 
Arrays.stream(HoodieFileFormat.values())
       .map(HoodieFileFormat::getFileExtension)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index c5465b2e0d49..66e647fadddf 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
 import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
 import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
 
@@ -71,6 +72,9 @@ public class HoodieFileWriterFactory {
     if (ORC.getFileExtension().equals(extension)) {
       return newOrcFileWriter(instantTime, path, config, schema, 
taskContextSupplier);
     }
+    if (LANCE.getFileExtension().equals(extension)) {
+      return newLanceFileWriter(instantTime, path, config, schema, 
taskContextSupplier);
+    }
     throw new UnsupportedOperationException(extension + " format not supported 
yet.");
   }
 
@@ -107,6 +111,12 @@ public class HoodieFileWriterFactory {
     throw new UnsupportedOperationException();
   }
 
+  protected HoodieFileWriter newLanceFileWriter(
+      String instantTime, StoragePath path, HoodieConfig config, HoodieSchema 
schema,
+      TaskContextSupplier taskContextSupplier) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
   public static BloomFilter createBloomFilter(HoodieConfig config) {
     return BloomFilterFactory.createBloomFilter(
         
config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE),
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
new file mode 100644
index 000000000000..c5109e0637f7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.lance
+
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.io.memory.HoodieArrowAllocator
+import org.apache.hudi.io.storage.{HoodieSparkLanceReader, LanceRecordIterator}
+import org.apache.hudi.storage.StorageConfiguration
+
+import com.lancedb.lance.file.LanceFileReader
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.MessageType
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
SparkColumnarFileReader}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.LanceArrowUtils
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+/**
+ * Reader for Lance files in Spark datasource.
+ * Implements vectorized reading using LanceArrowColumnVector.
+ *
+ * @param enableVectorizedReader whether to use vectorized reading (currently 
always true for Lance)
+ */
+class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends 
SparkColumnarFileReader {
+
+  // Batch size for reading Lance files (number of rows per batch)
+  private val DEFAULT_BATCH_SIZE = 512
+
+  /**
+   * Read a Lance file with schema projection and partition column support.
+   *
+   * @param file              Lance file to read
+   * @param requiredSchema    desired output schema of the data (columns to 
read)
+   * @param partitionSchema   schema of the partition columns. Partition 
values will be appended to the end of every row
+   * @param internalSchemaOpt option of internal schema for schema.on.read 
(not currently used for Lance)
+   * @param filters           filters for data skipping. Not guaranteed to be 
used; the spark plan will also apply the filters.
+   * @param storageConf       the hadoop conf
+   * @return iterator of rows read from the file output type says 
[[InternalRow]] but could be [[ColumnarBatch]]
+   */
+  override def read(file: PartitionedFile,
+                    requiredSchema: StructType,
+                    partitionSchema: StructType,
+                    internalSchemaOpt: util.Option[InternalSchema],
+                    filters: Seq[Filter],
+                    storageConf: StorageConfiguration[Configuration],
+                    tableSchemaOpt: util.Option[MessageType] = 
util.Option.empty()): Iterator[InternalRow] = {
+
+    val filePath = file.filePath.toString
+
+    if (requiredSchema.isEmpty && partitionSchema.isEmpty) {
+      // No columns requested - return empty iterator
+      Iterator.empty
+    } else {
+      // Create child allocator for reading
+      val allocator = 
HoodieArrowAllocator.newChildAllocator(getClass.getSimpleName + "-data-" + 
filePath,
+        HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE);
+
+      try {
+        // Open Lance file reader
+        val lanceReader = LanceFileReader.open(filePath, allocator)
+
+        // Extract column names from required schema for projection
+        val columnNames: java.util.List[String] = if (requiredSchema.nonEmpty) 
{
+          requiredSchema.fields.map(_.name).toList.asJava
+        } else {
+          // If only partition columns requested, read minimal data
+          null
+        }
+
+        // Read data with column projection (filters not supported yet)
+        val arrowReader = lanceReader.readAll(columnNames, null, 
DEFAULT_BATCH_SIZE)
+
+        val schemaForIterator = if (requiredSchema.nonEmpty) {
+          requiredSchema
+        } else {
+          // Only compute schema from Lance file when requiredSchema is empty
+          val arrowSchema = lanceReader.schema()
+          LanceArrowUtils.fromArrowSchema(arrowSchema)
+        }
+
+        // Create iterator using shared LanceRecordIterator
+        val lanceIterator = new LanceRecordIterator(
+          allocator,
+          lanceReader,
+          arrowReader,
+          schemaForIterator,
+          filePath
+        )
+
+        // Register cleanup listener with Spark task context
+        Option(TaskContext.get()).foreach(
+          _.addTaskCompletionListener[Unit](_ => lanceIterator.close())
+        )
+
+        // Need to convert to scala iterator for proper reading
+        val iter = lanceIterator.asScala
+
+        // Handle partition columns
+        if (partitionSchema.length == 0) {
+          // No partition columns - return rows directly
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } else {
+          // Append partition values to each row using JoinedRow
+          val joinedRow = new JoinedRow()
+          iter.map(row => joinedRow(row, file.partitionValues))
+        }
+
+      } catch {
+        case e: Exception =>
+          allocator.close()
+          throw new IOException(s"Failed to read Lance file: $filePath", e)
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 4f1fb566d809..4dd3f66551de 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -133,6 +133,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     val orcBatchSupported = conf.orcVectorizedReaderEnabled &&
       schema.forall(s => OrcUtils.supportColumnarReads(
         s.dataType, 
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
+    // TODO: Implement columnar batch reading 
https://github.com/apache/hudi/issues/17736
+    val lanceBatchSupported = false
 
     val supportBatch = if (isMultipleBaseFileFormatsEnabled) {
       parquetBatchSupported && orcBatchSupported
@@ -140,6 +142,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
       parquetBatchSupported
     } else if (hoodieFileFormat == HoodieFileFormat.ORC) {
       orcBatchSupported
+    } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
+      lanceBatchSupported
     } else {
       throw new HoodieNotSupportedException("Unsupported file format: " + 
hoodieFileFormat)
     }
@@ -306,6 +310,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
       sparkAdapter.createParquetFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration)
     } else if (hoodieFileFormat == HoodieFileFormat.ORC) {
       sparkAdapter.createOrcFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration, dataSchema)
+    } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
+      sparkAdapter.createLanceFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration)
     } else {
       throw new HoodieNotSupportedException("Unsupported file format: " + 
hoodieFileFormat)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
new file mode 100644
index 000000000000..32e7d869c6e4
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.DefaultSparkRecordMerger
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty
+
+/**
+ * Basic functional tests for Lance file format with Hudi Spark datasource.
+ */
+@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true")
+class TestLanceDataSource extends HoodieSparkClientTestBase {
+
+  var spark: SparkSession = _
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    super.tearDown()
+    spark = null
+  }
+
+  @Test
+  def testBasicWriteAndRead(): Unit = {
+    val tableName = "test_lance_table"
+    val tablePath = s"$basePath/$tableName"
+
+    // Create test data
+    val records = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1)
+    )
+    val expectedDf = spark.createDataFrame(records).toDF("id", "name", "age", 
"score")
+
+    // Write to Hudi table with Lance base file format
+    expectedDf.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Read back and verify
+    val readDf = spark.read
+      .format("hudi")
+      .load(tablePath)
+
+    val actual = readDf.select("id", "name", "age", "score")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
+  @Test
+  def testSchemaProjection(): Unit = {
+    val tableName = "test_lance_projection"
+    val tablePath = s"$basePath/$tableName"
+
+    // Create test data with multiple columns
+    val records = Seq(
+      (1, "Alice", 30, 95.5, "Engineering"),
+      (2, "Bob", 25, 87.3, "Sales"),
+      (3, "Charlie", 35, 92.1, "Marketing")
+    )
+    val inputDf = spark.createDataFrame(records).toDF("id", "name", "age", 
"score", "department")
+
+    // Write to Hudi table with Lance format
+    inputDf.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+      .mode(SaveMode.Overwrite)
+      .save(tablePath)
+
+    // Read with schema projection - only select subset of columns
+    val readDf = spark.read
+      .format("hudi")
+      .load(tablePath)
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (1, "Alice"),
+      (2, "Bob"),
+      (3, "Charlie")
+    )).toDF("id", "name")
+
+    val actual = readDf.select("id", "name")
+
+    // Verify schema projection - should only have 2 columns
+    assertEquals(2, actual.schema.fields.length, "Should only have 2 columns")
+
+    // Verify data equality
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 8e645b133624..4ac9f2875fd7 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -154,6 +154,13 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
     Spark33OrcReader.build(vectorized, sqlConf, options, hadoopConf, 
dataSchema)
   }
 
+  override def createLanceFileReader(vectorized: Boolean,
+                                     sqlConf: SQLConf,
+                                     options: Map[String, String],
+                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
+    throw new UnsupportedOperationException("Lance format is not supported in 
Spark 3.3")
+  }
+
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{
     jssc.stop()
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 21aa4aefeda6..22dfce461d67 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
 import org.apache.spark.sql.execution.datasources.orc.Spark34OrcReader
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetFilters, Spark34LegacyHoodieParquetFileFormat, Spark34ParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -154,6 +155,13 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
     Spark34OrcReader.build(vectorized, sqlConf, options, hadoopConf, 
dataSchema)
   }
 
+  override def createLanceFileReader(vectorized: Boolean,
+                                     sqlConf: SQLConf,
+                                     options: Map[String, String],
+                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
+    new SparkLanceReaderBase(vectorized)
+  }
+
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{
     jssc.sc.stop(exitCode)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index f8f3baf11af1..2db62f2fca8f 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
 import org.apache.spark.sql.execution.datasources.orc.Spark35OrcReader
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetFilters, Spark35LegacyHoodieParquetFileFormat, Spark35ParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -170,6 +171,13 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
     Spark35OrcReader.build(vectorized, sqlConf, options, hadoopConf, 
dataSchema)
   }
 
+  override def createLanceFileReader(vectorized: Boolean,
+                                     sqlConf: SQLConf,
+                                     options: Map[String, String],
+                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
+    new SparkLanceReaderBase(vectorized)
+  }
+
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{
     jssc.sc.stop(exitCode)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index eaa4736a7fdc..cbf0b2de8bb4 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY, 
RebaseDateTime}
 import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
 import org.apache.spark.sql.execution.datasources.orc.Spark40OrcReader
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -168,6 +169,13 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
     Spark40OrcReader.build(vectorized, sqlConf, options, hadoopConf, 
dataSchema)
   }
 
+  override def createLanceFileReader(vectorized: Boolean,
+                                     sqlConf: SQLConf,
+                                     options: Map[String, String],
+                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
+    new SparkLanceReaderBase(vectorized)
+  }
+
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{
     jssc.sc.stop(exitCode)
   }
diff --git a/pom.xml b/pom.xml
index 39e65acb2e9b..0e2b49707551 100644
--- a/pom.xml
+++ b/pom.xml
@@ -512,6 +512,7 @@
             <trimStackTrace>false</trimStackTrace>
             <systemPropertyVariables>
               
<log4j.configurationFile>${surefire-log4j.file}</log4j.configurationFile>
+              <lance.skip.tests>${lance.skip.tests}</lance.skip.tests>
             </systemPropertyVariables>
             <useSystemClassLoader>false</useSystemClassLoader>
             
<forkedProcessExitTimeoutInSeconds>30</forkedProcessExitTimeoutInSeconds>
@@ -555,6 +556,7 @@
             <filereports>TestSuite.txt</filereports>
             <systemProperties>
               
<log4j.configurationFile>${surefire-log4j.file}</log4j.configurationFile>
+              <lance.skip.tests>${lance.skip.tests}</lance.skip.tests>
             </systemProperties>
           </configuration>
           <executions>
@@ -2591,7 +2593,7 @@
         <kafka.version>3.3.2</kafka.version>
         <!-- Lance: Use Spark 3.4-specific artifact -->
         
<lance.spark.artifact>lance-spark-3.4_${scala.binary.version}</lance.spark.artifact>
-        <lance.skip.tests>false</lance.skip.tests>
+        <lance.skip.tests>true</lance.skip.tests>
         <!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc 
file-format dependency (hudi-hive-sync,
                    hudi-hadoop-mr, for ex). Since these Hudi modules might be 
used from w/in the execution engine(s)
                    bringing these file-formats as dependencies as well, we 
need to make sure that versions are


Reply via email to