This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a74d00a1b673 feat: Implement SparkColumnarFileReader for Datasource
integration with Lance (#17660)
a74d00a1b673 is described below
commit a74d00a1b6734683461a8768701eaf5dc4a817b0
Author: Rahil C <[email protected]>
AuthorDate: Tue Dec 30 09:35:10 2025 -0500
feat: Implement SparkColumnarFileReader for Datasource integration with
Lance (#17660)
* feat: Add HoodieBaseLanceFileWriter and implementation for SparkFileWriter
* feat: Add HoodieSparkLanceReader for reading lance files to internal row
* migrate to hoodie schema and address tim prev comments
* Implement SparkColumnarFileReader for Datasource integration with Lance
* fix usages to hoodie schema
* fix iterator for reuse
* minor fixes
* add DisabledIfSystemProperty
* try spark 4.0
* scala style plugin property change
* intial minor comments
* add spark 3.4
* address comments
* address ethan tim comments
* scalastyle
---
.../io/storage/HoodieSparkFileWriterFactory.java | 18 +-
.../hudi/io/storage/HoodieSparkLanceReader.java | 119 +-------------
.../hudi/io/storage/LanceRecordIterator.java | 183 +++++++++++++++++++++
.../hudi/BaseSparkInternalRecordContext.java | 2 +-
.../apache/spark/sql/HoodieInternalRowUtils.scala | 9 +-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 14 ++
.../apache/hudi/common/model/HoodieFileFormat.java | 6 +-
.../hudi/io/storage/HoodieFileWriterFactory.java | 10 ++
.../datasources/lance/SparkLanceReaderBase.scala | 140 ++++++++++++++++
.../HoodieFileGroupReaderBasedFileFormat.scala | 6 +
.../hudi/functional/TestLanceDataSource.scala | 132 +++++++++++++++
.../apache/spark/sql/adapter/Spark3_3Adapter.scala | 7 +
.../apache/spark/sql/adapter/Spark3_4Adapter.scala | 8 +
.../apache/spark/sql/adapter/Spark3_5Adapter.scala | 8 +
.../apache/spark/sql/adapter/Spark4_0Adapter.scala | 8 +
pom.xml | 4 +-
16 files changed, 544 insertions(+), 130 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index 9ba8e711946d..4b5264ab759f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -32,7 +32,6 @@ import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
-import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
@@ -59,7 +58,7 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
compressionCodecName = null;
}
//TODO boundary to revisit in follow up to use HoodieSchema directly
- HoodieRowParquetWriteSupport writeSupport =
getHoodieRowParquetWriteSupport(storage.getConf(), schema.getAvroSchema(),
+ HoodieRowParquetWriteSupport writeSupport =
getHoodieRowParquetWriteSupport(storage.getConf(), schema,
config, enableBloomFilter(populateMetaFields, config));
HoodieRowParquetConfig parquetConfig = new
HoodieRowParquetConfig(writeSupport,
CompressionCodecName.fromConf(compressionCodecName),
@@ -77,7 +76,7 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
protected HoodieFileWriter newParquetFileWriter(OutputStream outputStream,
HoodieConfig config,
HoodieSchema schema) throws
IOException {
boolean enableBloomFilter = false;
- HoodieRowParquetWriteSupport writeSupport =
getHoodieRowParquetWriteSupport(storage.getConf(), schema.getAvroSchema(),
config, enableBloomFilter);
+ HoodieRowParquetWriteSupport writeSupport =
getHoodieRowParquetWriteSupport(storage.getConf(), schema, config,
enableBloomFilter);
String compressionCodecName =
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
// Support PARQUET_COMPRESSION_CODEC_NAME is ""
if (compressionCodecName.isEmpty()) {
@@ -106,10 +105,19 @@ public class HoodieSparkFileWriterFactory extends
HoodieFileWriterFactory {
throw new HoodieIOException("Not support write to Orc file");
}
- private static HoodieRowParquetWriteSupport
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, Schema schema,
+ @Override
+ protected HoodieFileWriter newLanceFileWriter(String instantTime,
StoragePath path, HoodieConfig config, HoodieSchema schema,
+ TaskContextSupplier
taskContextSupplier) throws IOException {
+ boolean populateMetaFields =
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
+
+ return new HoodieSparkLanceWriter(path, structType, instantTime,
taskContextSupplier, storage, populateMetaFields);
+ }
+
+ private static HoodieRowParquetWriteSupport
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema
schema,
HoodieConfig config, boolean enableBloomFilter) {
Option<BloomFilter> filter = enableBloomFilter ?
Option.of(createBloomFilter(config)) : Option.empty();
- StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
return
HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(conf.unwrapAs(Configuration.class),
structType, filter, config);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index ff35bac92f91..90e4996e44a4 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -20,7 +20,6 @@ package org.apache.hudi.io.storage;
import com.lancedb.lance.file.LanceFileReader;
import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hudi.HoodieSchemaConversionUtils;
@@ -37,19 +36,14 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StoragePath;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.LanceArrowUtils;
-import org.apache.spark.sql.vectorized.ColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -155,7 +149,7 @@ public class HoodieSparkLanceReader implements
HoodieSparkFileReader {
// Read only the requested columns from Lance file for efficiency
ArrowReader arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE);
- return new LanceRecordIterator(allocator, lanceReader, arrowReader,
requestedSparkSchema);
+ return new LanceRecordIterator(allocator, lanceReader, arrowReader,
requestedSparkSchema, path.toString());
} catch (Exception e) {
allocator.close();
throw new HoodieException("Failed to create Lance reader for: " + path,
e);
@@ -195,113 +189,4 @@ public class HoodieSparkLanceReader implements
HoodieSparkFileReader {
* Iterator implementation that reads Lance file batches and converts to
UnsafeRows.
* Keeps ColumnarBatch alive while iterating to avoid unnecessary data
copying.
*/
- private class LanceRecordIterator implements ClosableIterator<UnsafeRow> {
- private final BufferAllocator allocator;
- private final LanceFileReader lanceReader;
- private final ArrowReader arrowReader;
- private final UnsafeProjection projection;
- private ColumnarBatch currentBatch;
- private Iterator<InternalRow> rowIterator;
- private ColumnVector[] columnVectors;
-
- public LanceRecordIterator(BufferAllocator allocator,
- LanceFileReader lanceReader,
- ArrowReader arrowReader,
- StructType schema) {
- this.allocator = allocator;
- this.lanceReader = lanceReader;
- this.arrowReader = arrowReader;
- this.projection = UnsafeProjection.create(schema);
- }
-
- @Override
- public boolean hasNext() {
- // If we have records in current batch, return true
- if (rowIterator != null && rowIterator.hasNext()) {
- return true;
- }
-
- // Close previous batch before loading next
- if (currentBatch != null) {
- currentBatch.close();
- currentBatch = null;
- }
-
- // Try to load next batch
- try {
- if (arrowReader.loadNextBatch()) {
- VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
-
- // Wrap each Arrow FieldVector in LanceArrowColumnVector for
type-safe access
- // Cache the column wrappers on first batch and reuse for all
subsequent batches
- if (columnVectors == null) {
- columnVectors = root.getFieldVectors().stream()
- .map(LanceArrowColumnVector::new)
- .toArray(ColumnVector[]::new);
- }
-
- // Create ColumnarBatch and keep it alive while iterating
- currentBatch = new ColumnarBatch(columnVectors, root.getRowCount());
- rowIterator = currentBatch.rowIterator();
- return rowIterator.hasNext();
- }
- } catch (IOException e) {
- throw new HoodieException("Failed to read next batch from Lance file:
" + path, e);
- }
-
- return false;
- }
-
- @Override
- public UnsafeRow next() {
- if (!hasNext()) {
- throw new IllegalStateException("No more records available");
- }
- InternalRow row = rowIterator.next();
- // Convert to UnsafeRow immediately while batch is still open
- return projection.apply(row);
- }
-
- @Override
- public void close() {
- IOException arrowException = null;
- Exception lanceException = null;
-
- // Close current batch if exists
- if (currentBatch != null) {
- currentBatch.close();
- }
-
- // Close Arrow reader
- if (arrowReader != null) {
- try {
- arrowReader.close();
- } catch (IOException e) {
- arrowException = e;
- }
- }
-
- // Close Lance reader
- if (lanceReader != null) {
- try {
- lanceReader.close();
- } catch (Exception e) {
- lanceException = e;
- }
- }
-
- // Always close allocator
- if (allocator != null) {
- allocator.close();
- }
-
- // Throw any exceptions that occurred
- if (arrowException != null) {
- throw new HoodieIOException("Failed to close Arrow reader",
arrowException);
- }
- if (lanceException != null) {
- throw new HoodieException("Failed to close Lance reader",
lanceException);
- }
- }
- }
-}
\ No newline at end of file
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
new file mode 100644
index 000000000000..88d5645879a6
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/LanceRecordIterator.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import com.lancedb.lance.file.LanceFileReader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.vectorized.ColumnVector;
+import org.apache.spark.sql.vectorized.ColumnarBatch;
+import org.apache.spark.sql.vectorized.LanceArrowColumnVector;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Shared iterator implementation for reading Lance files and converting Arrow
batches to Spark rows.
+ * This iterator is used by both Hudi's internal Lance reader and Spark
datasource integration.
+ *
+ * <p>The iterator manages the lifecycle of:
+ * <ul>
+ * <li>BufferAllocator - Arrow memory management</li>
+ * <li>LanceFileReader - Lance file handle</li>
+ * <li>ArrowReader - Arrow batch reader</li>
+ * <li>ColumnarBatch - Current batch being iterated</li>
+ * </ul>
+ *
+ * <p>Records are converted to {@link UnsafeRow} using {@link
UnsafeProjection} for efficient
+ * serialization and memory management.
+ */
+public class LanceRecordIterator implements ClosableIterator<UnsafeRow> {
+ private final BufferAllocator allocator;
+ private final LanceFileReader lanceReader;
+ private final ArrowReader arrowReader;
+ private final UnsafeProjection projection;
+ private final String path;
+
+ private ColumnarBatch currentBatch;
+ private Iterator<InternalRow> rowIterator;
+ private ColumnVector[] columnVectors;
+ private boolean closed = false;
+
+ /**
+ * Creates a new Lance record iterator.
+ *
+ * @param allocator Arrow buffer allocator for memory management
+ * @param lanceReader Lance file reader
+ * @param arrowReader Arrow reader for batch reading
+ * @param schema Spark schema for the records
+ * @param path File path (for error messages)
+ */
+ public LanceRecordIterator(BufferAllocator allocator,
+ LanceFileReader lanceReader,
+ ArrowReader arrowReader,
+ StructType schema,
+ String path) {
+ this.allocator = allocator;
+ this.lanceReader = lanceReader;
+ this.arrowReader = arrowReader;
+ this.projection = UnsafeProjection.create(schema);
+ this.path = path;
+ }
+
+ @Override
+ public boolean hasNext() {
+ // If we have records in current batch, return true
+ if (rowIterator != null && rowIterator.hasNext()) {
+ return true;
+ }
+
+ // Close previous batch before loading next
+ if (currentBatch != null) {
+ currentBatch.close();
+ currentBatch = null;
+ }
+
+ // Try to load next batch
+ try {
+ if (arrowReader.loadNextBatch()) {
+ VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
+
+ // Wrap each Arrow FieldVector in LanceArrowColumnVector for type-safe
access
+ // Cache the column wrappers on first batch and reuse for all
subsequent batches
+ if (columnVectors == null) {
+ columnVectors = root.getFieldVectors().stream()
+ .map(LanceArrowColumnVector::new)
+ .toArray(ColumnVector[]::new);
+ }
+
+ // Create ColumnarBatch and keep it alive while iterating
+ currentBatch = new ColumnarBatch(columnVectors, root.getRowCount());
+ rowIterator = currentBatch.rowIterator();
+ return rowIterator.hasNext();
+ }
+ } catch (IOException e) {
+ throw new HoodieException("Failed to read next batch from Lance file: "
+ path, e);
+ }
+
+ return false;
+ }
+
+ @Override
+ public UnsafeRow next() {
+ if (!hasNext()) {
+ throw new IllegalStateException("No more records available");
+ }
+ InternalRow row = rowIterator.next();
+ // Convert to UnsafeRow immediately while batch is still open
+ return projection.apply(row);
+ }
+
+ @Override
+ public void close() {
+ // Make close() idempotent - safe to call multiple times
+ if (closed) {
+ return;
+ }
+ closed = true;
+
+ IOException arrowException = null;
+ Exception lanceException = null;
+
+ // Close current batch if exists
+ if (currentBatch != null) {
+ currentBatch.close();
+ currentBatch = null;
+ }
+
+ // Close Arrow reader
+ if (arrowReader != null) {
+ try {
+ arrowReader.close();
+ } catch (IOException e) {
+ arrowException = e;
+ }
+ }
+
+ // Close Lance reader
+ if (lanceReader != null) {
+ try {
+ lanceReader.close();
+ } catch (Exception e) {
+ lanceException = e;
+ }
+ }
+
+ // Always close allocator
+ if (allocator != null) {
+ allocator.close();
+ }
+
+ // Throw any exceptions that occurred
+ if (arrowException != null) {
+ throw new HoodieIOException("Failed to close Arrow reader",
arrowException);
+ }
+ if (lanceException != null) {
+ throw new HoodieException("Failed to close Lance reader",
lanceException);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
index 449e8f289a01..1af96a2bf335 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
@@ -216,7 +216,7 @@ public abstract class BaseSparkInternalRecordContext
extends RecordContext<Inter
if (internalRow instanceof UnsafeRow) {
return internalRow;
}
- final UnsafeProjection unsafeProjection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema.toAvroSchema());
+ final UnsafeProjection unsafeProjection =
HoodieInternalRowUtils.getCachedUnsafeProjection(schema);
return unsafeProjection.apply(internalRow);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index c458fece591b..e2b782a424a8 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -18,11 +18,11 @@
package org.apache.spark.sql
+import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
import org.apache.hudi.exception.HoodieException
-
-import org.apache.avro.Schema
+import org.apache.hudi.common.schema.HoodieSchema
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath,
composeNestedFieldPath}
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData, UnsafeProjection, UnsafeRow}
@@ -35,7 +35,6 @@ import org.apache.spark.unsafe.types.UTF8String
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{Supplier, Function => JFunction}
import java.util.{ArrayDeque => JArrayDeque, Collections => JCollections,
Deque => JDeque, Map => JMap}
-
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -84,9 +83,9 @@ object HoodieInternalRowUtils {
/**
* Provides cached instance of [[UnsafeProjection]] to project Java object
based [[InternalRow]] to [[UnsafeRow]].
*/
- def getCachedUnsafeProjection(schema: Schema): UnsafeProjection = {
+ def getCachedUnsafeProjection(schema: HoodieSchema): UnsafeProjection = {
identicalUnsafeProjectionThreadLocal.get()
- .getOrElseUpdate(schema,
UnsafeProjection.create(getCachedSchema(schema)))
+ .getOrElseUpdate(schema.getAvroSchema,
UnsafeProjection.create(getCachedSchema(schema.getAvroSchema)))
}
/**
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 5d47c6b08dae..af45ebc53444 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -215,6 +215,20 @@ trait SparkAdapter extends Serializable {
hadoopConf: Configuration,
dataSchema: StructType): SparkColumnarFileReader
+ /**
+ * Get Lance file reader
+ *
+ * @param vectorized true if vectorized reading is not prohibited due to
schema, reading mode, etc
+ * @param sqlConf the [[SQLConf]] used for the read
+ * @param options passed as a param to the file format
+ * @param hadoopConf some configs will be set for the hadoopConf
+ * @return Lance file reader
+ */
+ def createLanceFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration): SparkColumnarFileReader
+
/**
* use new qe execute
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index 6e20c2dfeaf9..a4913ae55c5d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -49,7 +49,11 @@ public enum HoodieFileFormat {
+ "way to store Hive data. It was designed to overcome limitations of
the other Hive file "
+ "formats. Using ORC files improves performance when Hive is reading,
writing, and "
+ "processing data.")
- ORC(".orc");
+ ORC(".orc"),
+
+ @EnumFieldDescription("Lance is a modern columnar data format optimized for
random access patterns "
+ + "and designed for ML and AI workloads")
+ LANCE(".lance");
public static final Set<String> BASE_FILE_EXTENSIONS =
Arrays.stream(HoodieFileFormat.values())
.map(HoodieFileFormat::getFileExtension)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index c5465b2e0d49..66e647fadddf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.io.OutputStream;
import static org.apache.hudi.common.model.HoodieFileFormat.HFILE;
+import static org.apache.hudi.common.model.HoodieFileFormat.LANCE;
import static org.apache.hudi.common.model.HoodieFileFormat.ORC;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
@@ -71,6 +72,9 @@ public class HoodieFileWriterFactory {
if (ORC.getFileExtension().equals(extension)) {
return newOrcFileWriter(instantTime, path, config, schema,
taskContextSupplier);
}
+ if (LANCE.getFileExtension().equals(extension)) {
+ return newLanceFileWriter(instantTime, path, config, schema,
taskContextSupplier);
+ }
throw new UnsupportedOperationException(extension + " format not supported
yet.");
}
@@ -107,6 +111,12 @@ public class HoodieFileWriterFactory {
throw new UnsupportedOperationException();
}
+ protected HoodieFileWriter newLanceFileWriter(
+ String instantTime, StoragePath path, HoodieConfig config, HoodieSchema
schema,
+ TaskContextSupplier taskContextSupplier) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
public static BloomFilter createBloomFilter(HoodieConfig config) {
return BloomFilterFactory.createBloomFilter(
config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE),
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
new file mode 100644
index 000000000000..c5109e0637f7
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.lance
+
+import org.apache.hudi.common.util
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.io.memory.HoodieArrowAllocator
+import org.apache.hudi.io.storage.{HoodieSparkLanceReader, LanceRecordIterator}
+import org.apache.hudi.storage.StorageConfiguration
+
+import com.lancedb.lance.file.LanceFileReader
+import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.MessageType
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.execution.datasources.{PartitionedFile,
SparkColumnarFileReader}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.LanceArrowUtils
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+
+/**
+ * Reader for Lance files in Spark datasource.
+ * Implements vectorized reading using LanceArrowColumnVector.
+ *
+ * @param enableVectorizedReader whether to use vectorized reading (currently
always true for Lance)
+ */
+class SparkLanceReaderBase(enableVectorizedReader: Boolean) extends
SparkColumnarFileReader {
+
+ // Batch size for reading Lance files (number of rows per batch)
+ private val DEFAULT_BATCH_SIZE = 512
+
+ /**
+ * Read a Lance file with schema projection and partition column support.
+ *
+ * @param file Lance file to read
+ * @param requiredSchema desired output schema of the data (columns to
read)
+ * @param partitionSchema schema of the partition columns. Partition
values will be appended to the end of every row
+ * @param internalSchemaOpt option of internal schema for schema.on.read
(not currently used for Lance)
+ * @param filters filters for data skipping. Not guaranteed to be
used; the spark plan will also apply the filters.
+ * @param storageConf the hadoop conf
+ * @return iterator of rows read from the file output type says
[[InternalRow]] but could be [[ColumnarBatch]]
+ */
+ override def read(file: PartitionedFile,
+ requiredSchema: StructType,
+ partitionSchema: StructType,
+ internalSchemaOpt: util.Option[InternalSchema],
+ filters: Seq[Filter],
+ storageConf: StorageConfiguration[Configuration],
+ tableSchemaOpt: util.Option[MessageType] =
util.Option.empty()): Iterator[InternalRow] = {
+
+ val filePath = file.filePath.toString
+
+ if (requiredSchema.isEmpty && partitionSchema.isEmpty) {
+ // No columns requested - return empty iterator
+ Iterator.empty
+ } else {
+ // Create child allocator for reading
+ val allocator =
HoodieArrowAllocator.newChildAllocator(getClass.getSimpleName + "-data-" +
filePath,
+ HoodieSparkLanceReader.LANCE_DATA_ALLOCATOR_SIZE);
+
+ try {
+ // Open Lance file reader
+ val lanceReader = LanceFileReader.open(filePath, allocator)
+
+ // Extract column names from required schema for projection
+ val columnNames: java.util.List[String] = if (requiredSchema.nonEmpty)
{
+ requiredSchema.fields.map(_.name).toList.asJava
+ } else {
+ // If only partition columns requested, read minimal data
+ null
+ }
+
+ // Read data with column projection (filters not supported yet)
+ val arrowReader = lanceReader.readAll(columnNames, null,
DEFAULT_BATCH_SIZE)
+
+ val schemaForIterator = if (requiredSchema.nonEmpty) {
+ requiredSchema
+ } else {
+ // Only compute schema from Lance file when requiredSchema is empty
+ val arrowSchema = lanceReader.schema()
+ LanceArrowUtils.fromArrowSchema(arrowSchema)
+ }
+
+ // Create iterator using shared LanceRecordIterator
+ val lanceIterator = new LanceRecordIterator(
+ allocator,
+ lanceReader,
+ arrowReader,
+ schemaForIterator,
+ filePath
+ )
+
+ // Register cleanup listener with Spark task context
+ Option(TaskContext.get()).foreach(
+ _.addTaskCompletionListener[Unit](_ => lanceIterator.close())
+ )
+
+ // Need to convert to scala iterator for proper reading
+ val iter = lanceIterator.asScala
+
+ // Handle partition columns
+ if (partitionSchema.length == 0) {
+ // No partition columns - return rows directly
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ // Append partition values to each row using JoinedRow
+ val joinedRow = new JoinedRow()
+ iter.map(row => joinedRow(row, file.partitionValues))
+ }
+
+ } catch {
+ case e: Exception =>
+ allocator.close()
+ throw new IOException(s"Failed to read Lance file: $filePath", e)
+ }
+ }
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 4f1fb566d809..4dd3f66551de 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -133,6 +133,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
val orcBatchSupported = conf.orcVectorizedReaderEnabled &&
schema.forall(s => OrcUtils.supportColumnarReads(
s.dataType,
sparkSession.sessionState.conf.orcVectorizedReaderNestedColumnEnabled))
+ // TODO: Implement columnar batch reading
https://github.com/apache/hudi/issues/17736
+ val lanceBatchSupported = false
val supportBatch = if (isMultipleBaseFileFormatsEnabled) {
parquetBatchSupported && orcBatchSupported
@@ -140,6 +142,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
parquetBatchSupported
} else if (hoodieFileFormat == HoodieFileFormat.ORC) {
orcBatchSupported
+ } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
+ lanceBatchSupported
} else {
throw new HoodieNotSupportedException("Unsupported file format: " +
hoodieFileFormat)
}
@@ -306,6 +310,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
sparkAdapter.createParquetFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration)
} else if (hoodieFileFormat == HoodieFileFormat.ORC) {
sparkAdapter.createOrcFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration, dataSchema)
+ } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
+ sparkAdapter.createLanceFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration)
} else {
throw new HoodieNotSupportedException("Unsupported file format: " +
hoodieFileFormat)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
new file mode 100644
index 000000000000..32e7d869c6e4
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.DefaultSparkRecordMerger
+import org.apache.hudi.common.table.HoodieTableConfig
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty
+
+/**
+ * Basic functional tests for Lance file format with Hudi Spark datasource.
+ */
+@DisabledIfSystemProperty(named = "lance.skip.tests", matches = "true")
+class TestLanceDataSource extends HoodieSparkClientTestBase {
+
+ var spark: SparkSession = _
+
+ @BeforeEach
+ override def setUp(): Unit = {
+ super.setUp()
+ spark = sqlContext.sparkSession
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ super.tearDown()
+ spark = null
+ }
+
+ @Test
+ def testBasicWriteAndRead(): Unit = {
+ val tableName = "test_lance_table"
+ val tablePath = s"$basePath/$tableName"
+
+ // Create test data
+ val records = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1)
+ )
+ val expectedDf = spark.createDataFrame(records).toDF("id", "name", "age",
"score")
+
+ // Write to Hudi table with Lance base file format
+ expectedDf.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Read back and verify
+ val readDf = spark.read
+ .format("hudi")
+ .load(tablePath)
+
+ val actual = readDf.select("id", "name", "age", "score")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
+ @Test
+ def testSchemaProjection(): Unit = {
+ val tableName = "test_lance_projection"
+ val tablePath = s"$basePath/$tableName"
+
+ // Create test data with multiple columns
+ val records = Seq(
+ (1, "Alice", 30, 95.5, "Engineering"),
+ (2, "Bob", 25, 87.3, "Sales"),
+ (3, "Charlie", 35, 92.1, "Marketing")
+ )
+ val inputDf = spark.createDataFrame(records).toDF("id", "name", "age",
"score", "department")
+
+ // Write to Hudi table with Lance format
+ inputDf.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+ .mode(SaveMode.Overwrite)
+ .save(tablePath)
+
+ // Read with schema projection - only select subset of columns
+ val readDf = spark.read
+ .format("hudi")
+ .load(tablePath)
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (1, "Alice"),
+ (2, "Bob"),
+ (3, "Charlie")
+ )).toDF("id", "name")
+
+ val actual = readDf.select("id", "name")
+
+ // Verify schema projection - should only have 2 columns
+ assertEquals(2, actual.schema.fields.length, "Should only have 2 columns")
+
+ // Verify data equality
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 8e645b133624..4ac9f2875fd7 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -154,6 +154,13 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
Spark33OrcReader.build(vectorized, sqlConf, options, hadoopConf,
dataSchema)
}
+ override def createLanceFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkColumnarFileReader = {
+ throw new UnsupportedOperationException("Lance format is not supported in
Spark 3.3")
+ }
+
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{
jssc.stop()
}
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 21aa4aefeda6..22dfce461d67 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY,
RebaseDateTime}
import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
import org.apache.spark.sql.execution.datasources.orc.Spark34OrcReader
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters, Spark34LegacyHoodieParquetFileFormat, Spark34ParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -154,6 +155,13 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
Spark34OrcReader.build(vectorized, sqlConf, options, hadoopConf,
dataSchema)
}
+ override def createLanceFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkColumnarFileReader = {
+ new SparkLanceReaderBase(vectorized)
+ }
+
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{
jssc.sc.stop(exitCode)
}
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index f8f3baf11af1..2db62f2fca8f 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY,
RebaseDateTime}
import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
import org.apache.spark.sql.execution.datasources.orc.Spark35OrcReader
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters, Spark35LegacyHoodieParquetFileFormat, Spark35ParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -170,6 +171,13 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
Spark35OrcReader.build(vectorized, sqlConf, options, hadoopConf,
dataSchema)
}
+ override def createLanceFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkColumnarFileReader = {
+ new SparkLanceReaderBase(vectorized)
+ }
+
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{
jssc.sc.stop(exitCode)
}
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index eaa4736a7fdc..cbf0b2de8bb4 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY,
RebaseDateTime}
import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
import org.apache.spark.sql.execution.datasources.orc.Spark40OrcReader
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -168,6 +169,13 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
Spark40OrcReader.build(vectorized, sqlConf, options, hadoopConf,
dataSchema)
}
+ override def createLanceFileReader(vectorized: Boolean,
+ sqlConf: SQLConf,
+ options: Map[String, String],
+ hadoopConf: Configuration):
SparkColumnarFileReader = {
+ new SparkLanceReaderBase(vectorized)
+ }
+
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{
jssc.sc.stop(exitCode)
}
diff --git a/pom.xml b/pom.xml
index 39e65acb2e9b..0e2b49707551 100644
--- a/pom.xml
+++ b/pom.xml
@@ -512,6 +512,7 @@
<trimStackTrace>false</trimStackTrace>
<systemPropertyVariables>
<log4j.configurationFile>${surefire-log4j.file}</log4j.configurationFile>
+ <lance.skip.tests>${lance.skip.tests}</lance.skip.tests>
</systemPropertyVariables>
<useSystemClassLoader>false</useSystemClassLoader>
<forkedProcessExitTimeoutInSeconds>30</forkedProcessExitTimeoutInSeconds>
@@ -555,6 +556,7 @@
<filereports>TestSuite.txt</filereports>
<systemProperties>
<log4j.configurationFile>${surefire-log4j.file}</log4j.configurationFile>
+ <lance.skip.tests>${lance.skip.tests}</lance.skip.tests>
</systemProperties>
</configuration>
<executions>
@@ -2591,7 +2593,7 @@
<kafka.version>3.3.2</kafka.version>
<!-- Lance: Use Spark 3.4-specific artifact -->
<lance.spark.artifact>lance-spark-3.4_${scala.binary.version}</lance.spark.artifact>
- <lance.skip.tests>false</lance.skip.tests>
+ <lance.skip.tests>true</lance.skip.tests>
<!-- NOTE: Some Hudi modules require standalone Parquet/Orc/etc
file-format dependency (hudi-hive-sync,
hudi-hadoop-mr, for ex). Since these Hudi modules might be
used from w/in the execution engine(s)
bringing these file-formats as dependencies as well, we
need to make sure that versions are