This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c29309892a57 fix: Ensure Lance works when populateMetaFields is false 
with user defined keygen (#18042)
c29309892a57 is described below

commit c29309892a57074068c55effe54389f08e717999
Author: Rahil C <[email protected]>
AuthorDate: Mon Feb 2 08:05:26 2026 -0800

    fix: Ensure Lance works when populateMetaFields is false with user defined 
keygen (#18042)
    
    * fix: Ensure Lance works when populateMetaFields is false with user 
defined key, as well as works with multi format reader
    
    * style fix
    
    * address inline feedback, use keySchema changes
    
    * fix existing multiformat tests failing on spark 3.3 with lance
    
    * fix fgreader spark failing tests on unsupported lance
    
    * address feedback around partition path handling, added tests for 
partition path, encountered a bug for parititon path handling and fixed
    
    * refactor to use option for lanceFileReader
    
    * fix parittion path test
---
 .../client/common/SparkReaderContextFactory.java   |   5 +-
 .../hudi/io/storage/HoodieSparkLanceReader.java    |   2 +-
 .../hudi/MultipleColumnarFileFormatReader.scala    |  12 ++-
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   4 +-
 .../org/apache/hudi/common/util/LanceUtils.java    |  39 +++----
 .../org/apache/hudi/HoodieMergeOnReadRDDV2.scala   |  11 +-
 .../datasources/lance/SparkLanceReaderBase.scala   |  12 ++-
 .../HoodieFileGroupReaderBasedFileFormat.scala     |   9 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |   5 +-
 .../hudi/functional/TestLanceDataSource.scala      | 118 +++++++++++++++------
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |   4 +-
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |   4 +-
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |   4 +-
 .../apache/spark/sql/adapter/Spark4_0Adapter.scala |   4 +-
 14 files changed, 158 insertions(+), 75 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index 4a2f52707121..7dd00a9e430c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -96,13 +96,14 @@ public class SparkReaderContextFactory implements 
ReaderContextFactory<InternalR
     if (metaClient.getTableConfig().isMultipleBaseFileFormatsEnabled()) {
       SparkColumnarFileReader parquetFileReader = 
sparkAdapter.createParquetFileReader(false, sqlConf, options, configs);
       SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver, 
sqlConf, options, configs, sparkAdapter);
-      baseFileReaderBroadcast = jsc.broadcast(new 
MultipleColumnarFileFormatReader(parquetFileReader, orcFileReader));
+      SparkColumnarFileReader lanceFileReader = 
sparkAdapter.createLanceFileReader(false, sqlConf, options, 
configs).getOrElse(null);
+      baseFileReaderBroadcast = jsc.broadcast(new 
MultipleColumnarFileFormatReader(parquetFileReader, orcFileReader, 
lanceFileReader));
     } else if (metaClient.getTableConfig().getBaseFileFormat() == 
HoodieFileFormat.ORC) {
       SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver, 
sqlConf, options, configs, sparkAdapter);
       baseFileReaderBroadcast = jsc.broadcast(orcFileReader);
     } else if (metaClient.getTableConfig().getBaseFileFormat() == 
HoodieFileFormat.LANCE) {
       baseFileReaderBroadcast = jsc.broadcast(
-          sparkAdapter.createLanceFileReader(false, sqlConf, options, 
configs));
+          sparkAdapter.createLanceFileReader(false, sqlConf, options, 
configs).getOrElse(null));
     } else {
       baseFileReaderBroadcast = jsc.broadcast(
           sparkAdapter.createParquetFileReader(false, sqlConf, options, 
configs));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index 71140791b336..f99bf2ce9eac 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -164,7 +164,7 @@ public class HoodieSparkLanceReader implements 
HoodieSparkFileReader {
          LanceFileReader reader = LanceFileReader.open(path.toString(), 
allocator)) {
       Schema arrowSchema = reader.schema();
       StructType structType = LanceArrowUtils.fromArrowSchema(arrowSchema);
-      return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, 
"record", "", true);
+      return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, 
"record", "", false);
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema from Lance file: " + 
path, e);
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
index afe0c36ad1fa..584747706aa2 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
@@ -29,9 +29,14 @@ import 
org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumna
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
-class MultipleColumnarFileFormatReader(parquetReader: SparkColumnarFileReader, 
orcReader: SparkColumnarFileReader)
+class MultipleColumnarFileFormatReader(parquetReader: SparkColumnarFileReader, 
orcReader: SparkColumnarFileReader, lanceReader: SparkColumnarFileReader)
   extends SparkColumnarFileReader with SparkAdapterSupport {
 
+  def this(parquetReader: SparkColumnarFileReader, orcReader: 
SparkColumnarFileReader) = {
+    // constructor for Spark versions without Lance support
+    this(parquetReader, orcReader, null)
+  }
+
   /**
    * Read an individual file
    *
@@ -53,6 +58,11 @@ class MultipleColumnarFileFormatReader(parquetReader: 
SparkColumnarFileReader, o
         parquetReader.read(file, requiredSchema, partitionSchema, 
internalSchemaOpt, filters, storageConf, tableSchemaOpt)
       case HoodieFileFormat.ORC =>
         orcReader.read(file, requiredSchema, partitionSchema, 
internalSchemaOpt, filters, storageConf, tableSchemaOpt)
+      case HoodieFileFormat.LANCE =>
+        if (lanceReader == null) {
+          throw new UnsupportedOperationException("Lance format is only 
supported in Spark 3.4 and above")
+        }
+        lanceReader.read(file, requiredSchema, partitionSchema, 
internalSchemaOpt, filters, storageConf, tableSchemaOpt)
       case _ =>
         throw new IllegalArgumentException(s"Unsupported file format for file: 
$filePath")
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 1afcc78df058..902f9035bebc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -221,12 +221,12 @@ trait SparkAdapter extends Serializable {
    * @param sqlConf    the [[SQLConf]] used for the read
    * @param options    passed as a param to the file format
    * @param hadoopConf some configs will be set for the hadoopConf
-   * @return Lance file reader
+   * @return Lance file reader wrapped in Option; None if Lance format is not 
supported in current Spark version (i.e 3.3)
    */
   def createLanceFileReader(vectorized: Boolean,
                             sqlConf: SQLConf,
                             options: Map[String, String],
-                            hadoopConf: Configuration): SparkColumnarFileReader
+                            hadoopConf: Configuration): 
Option[SparkColumnarFileReader]
 
   /**
    * use new qe execute
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
index 53e7bdbf9500..94c6fde1ea37 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
@@ -76,27 +76,30 @@ public class LanceUtils extends FileFormatUtils {
                                                            
Option<BaseKeyGenerator> keyGeneratorOpt,
                                                            Option<String> 
partitionPath) {
     try {
+      HoodieSchema keySchema = getKeyIteratorSchema(storage, filePath, 
keyGeneratorOpt, partitionPath);
       HoodieFileReader reader = HoodieIOFactory.getIOFactory(storage)
           .getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
           .getFileReader(new HoodieReaderConfig(), filePath, 
HoodieFileFormat.LANCE);
-      ClosableIterator<String> keyIterator = reader.getRecordKeyIterator();
-      return new ClosableIterator<HoodieKey>() {
-        @Override
-        public void close() {
-          keyIterator.close();
-        }
-
-        @Override
-        public boolean hasNext() {
-          return keyIterator.hasNext();
-        }
-
-        @Override
-        public HoodieKey next() {
-          String key = keyIterator.next();
-          return new HoodieKey(key, partitionPath.orElse(null));
-        }
-      };
+      ClosableIterator<HoodieRecord> recordIterator = 
reader.getRecordIterator(keySchema);
+
+      return new CloseableMappingIterator<>(
+          recordIterator,
+          record -> {
+            String recordKey;
+            if (keyGeneratorOpt.isPresent()) {
+              // With keyGenerator, extracts user-defined key fields by name
+              recordKey = record.getRecordKey(keySchema, keyGeneratorOpt);
+            } else {
+              // Without keyGenerator, read record key field by name
+              recordKey = record.getRecordKey(keySchema, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+            }
+            // Extract partition path from record if not provided as parameter
+            String partitionPathValue = partitionPath.orElseGet(() ->
+                (String) record.getColumnValueAsJava(keySchema, 
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+                    CollectionUtils.emptyProps()));
+            return new HoodieKey(recordKey, partitionPathValue);
+          }
+      );
     } catch (IOException e) {
       throw new HoodieIOException("Failed to read from Lance file" + filePath, 
e);
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 0c3897144506..3d9b908c4d31 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -114,14 +114,17 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
     if (!metaClient.isMetadataTable) {
       val updatedOptions: Map[String, String] = options + 
(FileFormat.OPTION_RETURNING_BATCH -> "false") // disable vectorized reading 
for MOR
       if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled) {
-        sc.broadcast(new MultipleColumnarFileFormatReader(
-          sparkAdapter.createParquetFileReader(vectorized = false, sqlConf, 
updatedOptions, config),
-          sparkAdapter.createOrcFileReader(vectorized = false, sqlConf, 
updatedOptions, config, tableSchema.structTypeSchema)
-        ))
+        val parquetReader = sparkAdapter.createParquetFileReader(vectorized = 
false, sqlConf, updatedOptions, config)
+        val orcReader = sparkAdapter.createOrcFileReader(vectorized = false, 
sqlConf, updatedOptions, config, tableSchema.structTypeSchema)
+        val lanceReader = sparkAdapter.createLanceFileReader(vectorized = 
false, sqlConf, updatedOptions, config).orNull
+        val multiReader = new MultipleColumnarFileFormatReader(parquetReader, 
orcReader, lanceReader)
+        sc.broadcast(multiReader)
       } else if (metaClient.getTableConfig.getBaseFileFormat == 
HoodieFileFormat.PARQUET) {
         sc.broadcast(sparkAdapter.createParquetFileReader(vectorized = false, 
sqlConf, updatedOptions, config))
       } else if (metaClient.getTableConfig.getBaseFileFormat == 
HoodieFileFormat.ORC) {
         sc.broadcast(sparkAdapter.createOrcFileReader(vectorized = false, 
sqlConf, updatedOptions, config, tableSchema.structTypeSchema))
+      } else if (metaClient.getTableConfig.getBaseFileFormat == 
HoodieFileFormat.LANCE) {
+        sc.broadcast(sparkAdapter.createLanceFileReader(vectorized = false, 
sqlConf, updatedOptions, config).orNull)
       } else {
         throw new IllegalArgumentException(s"Unsupported base file format: 
${metaClient.getTableConfig.getBaseFileFormat}")
       }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
index 31eb5243e623..60461295e741 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.schema.MessageType
 import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
JoinedRow}
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.execution.datasources.{PartitionedFile, 
SparkColumnarFileReader}
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
@@ -125,9 +126,14 @@ class SparkLanceReaderBase(enableVectorizedReader: 
Boolean) extends SparkColumna
           // No partition columns - return rows directly
           iter.asInstanceOf[Iterator[InternalRow]]
         } else {
-          // Append partition values to each row using JoinedRow
+          // Create UnsafeProjection to convert JoinedRow to UnsafeRow
+          val fullSchema = (requiredSchema.fields ++ 
partitionSchema.fields).map(f =>
+            AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+          val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+
+          // Append partition values to each row using JoinedRow, then convert 
to UnsafeRow
           val joinedRow = new JoinedRow()
-          iter.map(row => joinedRow(row, file.partitionValues))
+          iter.map(row => unsafeProjection(joinedRow(row, 
file.partitionValues)))
         }
 
       } catch {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index a52ce3fd16bc..8d83c9f4223a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -313,15 +313,16 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
                                   dataSchema: StructType,
                                   enableVectorizedRead: Boolean): 
SparkColumnarFileReader = {
     if (isMultipleBaseFileFormatsEnabled) {
-      new MultipleColumnarFileFormatReader(
-        sparkAdapter.createParquetFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration),
-        sparkAdapter.createOrcFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration, dataSchema))
+      val parquetReader = 
sparkAdapter.createParquetFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration)
+      val orcReader = sparkAdapter.createOrcFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration, dataSchema)
+      val lanceReader = 
sparkAdapter.createLanceFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration).orNull
+      new MultipleColumnarFileFormatReader(parquetReader, orcReader, 
lanceReader)
     } else if (hoodieFileFormat == HoodieFileFormat.PARQUET) {
       sparkAdapter.createParquetFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration)
     } else if (hoodieFileFormat == HoodieFileFormat.ORC) {
       sparkAdapter.createOrcFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration, dataSchema)
     } else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
-      sparkAdapter.createLanceFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration)
+      sparkAdapter.createLanceFileReader(enableVectorizedRead, 
spark.sessionState.conf, options, configuration).orNull
     } else {
       throw new HoodieNotSupportedException("Unsupported file format: " + 
hoodieFileFormat)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 9a527b693c6e..22d23171be75 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.common.table.read
 
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSchemaConversionUtils, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD, 
TABLE_TYPE}
 import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode, 
TypedProperties}
 import org.apache.hudi.common.engine.HoodieReaderContext
@@ -110,7 +110,8 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
     val parquetReader = sparkAdapter.createParquetFileReader(vectorized = 
false, spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]))
     val dataSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
     val orcReader = sparkAdapter.createOrcFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]), dataSchema)
-    val multiFormatReader = new 
MultipleColumnarFileFormatReader(parquetReader, orcReader)
+    val lanceReader = sparkAdapter.createLanceFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration])).orNull
+    val multiFormatReader = new 
MultipleColumnarFileFormatReader(parquetReader, orcReader, lanceReader)
     new SparkFileFormatInternalRowReaderContext(multiFormatReader, Seq.empty, 
Seq.empty, getStorageConf, metaClient.getTableConfig)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index c9e1083bb550..8dabe5bfdacf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -396,30 +396,33 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
 
     // Initial insert - 3 records
     val records1 = Seq(
-      (1, "Alice", 30, 95.5),
-      (2, "Bob", 25, 87.3),
-      (3, "Charlie", 35, 92.1)
+      (1, "Alice", 30, 95.5, "engineering"),
+      (2, "Bob", 25, 87.3, "sales"),
+      (3, "Charlie", 35, 92.1, "engineering")
     )
-    val df1 = createDataFrame(records1)
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score", "department")
 
-    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite, operation = Some("insert"))
+    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite, operation = Some("insert"),
+      extraOptions = Map(PARTITIONPATH_FIELD.key() -> "department"))
 
     // Upsert - modify Bob's record (id=2)
     val records2 = Seq(
-      (2, "Bob", 40, 95.0)  // Update Bob: age 25->40, score 87.3->95.0
+      (2, "Bob", 40, 95.0, "sales")  // Update Bob: age 25->40, score 
87.3->95.0
     )
-    val df2 = createDataFrame(records2)
+    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score", "department")
 
-    writeDataframe(tableType, tableName, tablePath, df2, operation = 
Some("upsert"))
+    writeDataframe(tableType, tableName, tablePath, df2, operation = 
Some("upsert"),
+      extraOptions = Map(PARTITIONPATH_FIELD.key() -> "department"))
 
     // Second upsert - modify Alice (id=1) and insert David (id=4)
     val records3 = Seq(
-      (1, "Alice", 45, 98.5),  // Update Alice: age 30->45, score 95.5->98.5
-      (4, "David", 28, 88.0)   // Insert new record
+      (1, "Alice", 45, 98.5, "engineering"),  // Update Alice: age 30->45, 
score 95.5->98.5
+      (4, "David", 28, 88.0, "marketing")   // Insert new record
     )
-    val df3 = createDataFrame(records3)
+    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score", "department")
 
-    writeDataframe(tableType, tableName, tablePath, df3, operation = 
Some("upsert"))
+    writeDataframe(tableType, tableName, tablePath, df3, operation = 
Some("upsert"),
+      extraOptions = Map(PARTITIONPATH_FIELD.key() -> "department"))
 
     // Validate commits
     val metaClient = HoodieTableMetaClient.builder()
@@ -440,14 +443,14 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
 
     // Read and verify data
     val readDf = spark.read.format("hudi").load(tablePath)
-    val actual = readDf.select("id", "name", "age", "score")
+    val actual = readDf.select("id", "name", "age", "score", "department")
 
-    val expectedDf = createDataFrame(Seq(
-      (1, "Alice", 45, 98.5),
-      (2, "Bob", 40, 95.0),
-      (3, "Charlie", 35, 92.1),
-      (4, "David", 28, 88.0)
-    ))
+    val expectedDf = spark.createDataFrame(Seq(
+      (1, "Alice", 45, 98.5, "engineering"),
+      (2, "Bob", 40, 95.0, "sales"),
+      (3, "Charlie", 35, 92.1, "engineering"),
+      (4, "David", 28, 88.0, "marketing")
+    )).toDF("id", "name", "age", "score", "department")
 
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
@@ -455,25 +458,26 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     if (tableType == HoodieTableType.MERGE_ON_READ) {
       // Write one more commit to trigger compaction
       val records4 = Seq(
-        (1, "Alice", 50, 98.5),  // Update Alice: age 45->50
-        (4, "David", 28, 90.0)   // Update David: score 88.0->90.0
+        (1, "Alice", 50, 98.5, "engineering"),  // Update Alice: age 45->50
+        (4, "David", 28, 90.0, "marketing")   // Update David: score 88.0->90.0
       )
-      val df4 = createDataFrame(records4)
+      val df4 = spark.createDataFrame(records4).toDF("id", "name", "age", 
"score", "department")
       writeDataframe(tableType, tableName, tablePath, df4, operation = 
Some("upsert"),
-        extraOptions = Map("hoodie.compact.inline" -> "true", 
"hoodie.compact.inline.max.delta.commits" -> "1"))
-      val expectedDfAfterCompaction = createDataFrame(Seq(
-        (1, "Alice", 50, 98.5),
-        (2, "Bob", 40, 95.0),
-        (3, "Charlie", 35, 92.1),
-        (4, "David", 28, 90.0)
-      ))
+        extraOptions = Map("hoodie.compact.inline" -> "true", 
"hoodie.compact.inline.max.delta.commits" -> "1",
+          PARTITIONPATH_FIELD.key() -> "department"))
+      val expectedDfAfterCompaction = spark.createDataFrame(Seq(
+        (1, "Alice", 50, 98.5, "engineering"),
+        (2, "Bob", 40, 95.0, "sales"),
+        (3, "Charlie", 35, 92.1, "engineering"),
+        (4, "David", 28, 90.0, "marketing")
+      )).toDF("id", "name", "age", "score", "department")
       // validate compaction commit is present
       val compactionCommits = 
metaClient.reloadActiveTimeline().filterCompletedInstants().getInstants.asScala
         .filter(instant => instant.getAction == "commit")
       assertTrue(compactionCommits.nonEmpty, "Compaction commit should be 
present after upsert")
       // Read and verify data after compaction
       val readDfAfterCompaction = spark.read.format("hudi").load(tablePath)
-      val actualAfterCompaction = readDfAfterCompaction.select("id", "name", 
"age", "score")
+      val actualAfterCompaction = readDfAfterCompaction.select("id", "name", 
"age", "score", "department")
       
assertTrue(expectedDfAfterCompaction.except(actualAfterCompaction).isEmpty)
       
assertTrue(actualAfterCompaction.except(expectedDfAfterCompaction).isEmpty)
     }
@@ -659,6 +663,60 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     assertTrue(actual.except(expectedDf).isEmpty)
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testUpsertWithPopulateMetaFieldsDisabled(tableType: HoodieTableType): 
Unit = {
+    val tableName = s"test_lance_no_meta_${tableType.name().toLowerCase}"
+    val tablePath = s"$basePath/$tableName"
+
+    val records1 = Seq(
+      (101, "Alice", 30, 95.5, "engineering"),
+      (102, "Bob", 25, 87.3, "sales"),
+      (103, "Charlie", 35, 92.1, "engineering")
+    )
+    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score", "department")
+
+    // Write with populateMetaFields=false
+    writeDataframe(tableType, tableName, tablePath, df1,
+      saveMode = SaveMode.Overwrite, operation = Some("insert"),
+      extraOptions = Map("hoodie.populate.meta.fields" -> "false", 
PARTITIONPATH_FIELD.key() -> "department"))
+
+    // Upsert - modify Bob's record
+    val records2 = Seq(
+      (102, "Bob", 40, 95.0, "sales")
+    )
+    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score", "department")
+
+    writeDataframe(tableType, tableName, tablePath, df2,
+      operation = Some("upsert"),
+      extraOptions = Map("hoodie.populate.meta.fields" -> "false", 
PARTITIONPATH_FIELD.key() -> "department"))
+
+    // Second upsert - modify Alice and insert David
+    val records3 = Seq(
+      (101, "Alice", 45, 98.5, "engineering"),
+      (104, "David", 28, 88.0, "marketing")
+    )
+    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score", "department")
+
+    writeDataframe(tableType, tableName, tablePath, df3,
+      operation = Some("upsert"),
+      extraOptions = Map("hoodie.populate.meta.fields" -> "false", 
PARTITIONPATH_FIELD.key() -> "department"))
+
+    // Verify data
+    val readDf = spark.read.format("hudi").load(tablePath)
+    val actual = readDf.select("id", "name", "age", "score", "department")
+
+    val expectedDf = spark.createDataFrame(Seq(
+      (101, "Alice", 45, 98.5, "engineering"),
+      (102, "Bob", 40, 95.0, "sales"),
+      (103, "Charlie", 35, 92.1, "engineering"),
+      (104, "David", 28, 88.0, "marketing")
+    )).toDF("id", "name", "age", "score", "department")
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
   private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = {
     spark.createDataFrame(records).toDF("id", "name", "age", 
"score").coalesce(1)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 4ac9f2875fd7..a180ab3483dc 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -157,8 +157,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
   override def createLanceFileReader(vectorized: Boolean,
                                      sqlConf: SQLConf,
                                      options: Map[String, String],
-                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
-    throw new UnsupportedOperationException("Lance format is not supported in 
Spark 3.3")
+                                     hadoopConf: Configuration): 
Option[SparkColumnarFileReader] = {
+    None
   }
 
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 22dfce461d67..32b21b936f09 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -158,8 +158,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
   override def createLanceFileReader(vectorized: Boolean,
                                      sqlConf: SQLConf,
                                      options: Map[String, String],
-                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
-    new SparkLanceReaderBase(vectorized)
+                                     hadoopConf: Configuration): 
Option[SparkColumnarFileReader] = {
+    Some(new SparkLanceReaderBase(vectorized))
   }
 
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index 2db62f2fca8f..24492fdc358b 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -174,8 +174,8 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
   override def createLanceFileReader(vectorized: Boolean,
                                      sqlConf: SQLConf,
                                      options: Map[String, String],
-                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
-    new SparkLanceReaderBase(vectorized)
+                                     hadoopConf: Configuration): 
Option[SparkColumnarFileReader] = {
+    Some(new SparkLanceReaderBase(vectorized))
   }
 
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index cbf0b2de8bb4..60584b6aba39 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -172,8 +172,8 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
   override def createLanceFileReader(vectorized: Boolean,
                                      sqlConf: SQLConf,
                                      options: Map[String, String],
-                                     hadoopConf: Configuration): 
SparkColumnarFileReader = {
-    new SparkLanceReaderBase(vectorized)
+                                     hadoopConf: Configuration): 
Option[SparkColumnarFileReader] = {
+    Some(new SparkLanceReaderBase(vectorized))
   }
 
   override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = 
{

Reply via email to