This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c29309892a57 fix: Ensure Lance works when populateMetaFields is false
with user defined keygen (#18042)
c29309892a57 is described below
commit c29309892a57074068c55effe54389f08e717999
Author: Rahil C <[email protected]>
AuthorDate: Mon Feb 2 08:05:26 2026 -0800
fix: Ensure Lance works when populateMetaFields is false with user defined
keygen (#18042)
* fix: Ensure Lance works when populateMetaFields is false with user
defined key, as well as works with multi format reader
* style fix
* address inline feedback, use keySchema changes
* fix existing multiformat tests failing on spark 3.3 with lance
* fix fgreader spark failing tests on unsupported lance
* address feedback around partition path handling, added tests for
partition path, encountered a bug for parititon path handling and fixed
* refactor to use option for lanceFileReader
* fix parittion path test
---
.../client/common/SparkReaderContextFactory.java | 5 +-
.../hudi/io/storage/HoodieSparkLanceReader.java | 2 +-
.../hudi/MultipleColumnarFileFormatReader.scala | 12 ++-
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 4 +-
.../org/apache/hudi/common/util/LanceUtils.java | 39 +++----
.../org/apache/hudi/HoodieMergeOnReadRDDV2.scala | 11 +-
.../datasources/lance/SparkLanceReaderBase.scala | 12 ++-
.../HoodieFileGroupReaderBasedFileFormat.scala | 9 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 5 +-
.../hudi/functional/TestLanceDataSource.scala | 118 +++++++++++++++------
.../apache/spark/sql/adapter/Spark3_3Adapter.scala | 4 +-
.../apache/spark/sql/adapter/Spark3_4Adapter.scala | 4 +-
.../apache/spark/sql/adapter/Spark3_5Adapter.scala | 4 +-
.../apache/spark/sql/adapter/Spark4_0Adapter.scala | 4 +-
14 files changed, 158 insertions(+), 75 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index 4a2f52707121..7dd00a9e430c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -96,13 +96,14 @@ public class SparkReaderContextFactory implements
ReaderContextFactory<InternalR
if (metaClient.getTableConfig().isMultipleBaseFileFormatsEnabled()) {
SparkColumnarFileReader parquetFileReader =
sparkAdapter.createParquetFileReader(false, sqlConf, options, configs);
SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver,
sqlConf, options, configs, sparkAdapter);
- baseFileReaderBroadcast = jsc.broadcast(new
MultipleColumnarFileFormatReader(parquetFileReader, orcFileReader));
+ SparkColumnarFileReader lanceFileReader =
sparkAdapter.createLanceFileReader(false, sqlConf, options,
configs).getOrElse(null);
+ baseFileReaderBroadcast = jsc.broadcast(new
MultipleColumnarFileFormatReader(parquetFileReader, orcFileReader,
lanceFileReader));
} else if (metaClient.getTableConfig().getBaseFileFormat() ==
HoodieFileFormat.ORC) {
SparkColumnarFileReader orcFileReader = getOrcFileReader(resolver,
sqlConf, options, configs, sparkAdapter);
baseFileReaderBroadcast = jsc.broadcast(orcFileReader);
} else if (metaClient.getTableConfig().getBaseFileFormat() ==
HoodieFileFormat.LANCE) {
baseFileReaderBroadcast = jsc.broadcast(
- sparkAdapter.createLanceFileReader(false, sqlConf, options,
configs));
+ sparkAdapter.createLanceFileReader(false, sqlConf, options,
configs).getOrElse(null));
} else {
baseFileReaderBroadcast = jsc.broadcast(
sparkAdapter.createParquetFileReader(false, sqlConf, options,
configs));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
index 71140791b336..f99bf2ce9eac 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceReader.java
@@ -164,7 +164,7 @@ public class HoodieSparkLanceReader implements
HoodieSparkFileReader {
LanceFileReader reader = LanceFileReader.open(path.toString(),
allocator)) {
Schema arrowSchema = reader.schema();
StructType structType = LanceArrowUtils.fromArrowSchema(arrowSchema);
- return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,
"record", "", true);
+ return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,
"record", "", false);
} catch (Exception e) {
throw new HoodieException("Failed to read schema from Lance file: " +
path, e);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
index afe0c36ad1fa..584747706aa2 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/MultipleColumnarFileFormatReader.scala
@@ -29,9 +29,14 @@ import
org.apache.spark.sql.execution.datasources.{PartitionedFile, SparkColumna
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
-class MultipleColumnarFileFormatReader(parquetReader: SparkColumnarFileReader,
orcReader: SparkColumnarFileReader)
+class MultipleColumnarFileFormatReader(parquetReader: SparkColumnarFileReader,
orcReader: SparkColumnarFileReader, lanceReader: SparkColumnarFileReader)
extends SparkColumnarFileReader with SparkAdapterSupport {
+ def this(parquetReader: SparkColumnarFileReader, orcReader:
SparkColumnarFileReader) = {
+ // constructor for Spark versions without Lance support
+ this(parquetReader, orcReader, null)
+ }
+
/**
* Read an individual file
*
@@ -53,6 +58,11 @@ class MultipleColumnarFileFormatReader(parquetReader:
SparkColumnarFileReader, o
parquetReader.read(file, requiredSchema, partitionSchema,
internalSchemaOpt, filters, storageConf, tableSchemaOpt)
case HoodieFileFormat.ORC =>
orcReader.read(file, requiredSchema, partitionSchema,
internalSchemaOpt, filters, storageConf, tableSchemaOpt)
+ case HoodieFileFormat.LANCE =>
+ if (lanceReader == null) {
+ throw new UnsupportedOperationException("Lance format is only
supported in Spark 3.4 and above")
+ }
+ lanceReader.read(file, requiredSchema, partitionSchema,
internalSchemaOpt, filters, storageConf, tableSchemaOpt)
case _ =>
throw new IllegalArgumentException(s"Unsupported file format for file:
$filePath")
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 1afcc78df058..902f9035bebc 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -221,12 +221,12 @@ trait SparkAdapter extends Serializable {
* @param sqlConf the [[SQLConf]] used for the read
* @param options passed as a param to the file format
* @param hadoopConf some configs will be set for the hadoopConf
- * @return Lance file reader
+ * @return Lance file reader wrapped in Option; None if Lance format is not
supported in current Spark version (i.e 3.3)
*/
def createLanceFileReader(vectorized: Boolean,
sqlConf: SQLConf,
options: Map[String, String],
- hadoopConf: Configuration): SparkColumnarFileReader
+ hadoopConf: Configuration):
Option[SparkColumnarFileReader]
/**
* use new qe execute
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
index 53e7bdbf9500..94c6fde1ea37 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/LanceUtils.java
@@ -76,27 +76,30 @@ public class LanceUtils extends FileFormatUtils {
Option<BaseKeyGenerator> keyGeneratorOpt,
Option<String>
partitionPath) {
try {
+ HoodieSchema keySchema = getKeyIteratorSchema(storage, filePath,
keyGeneratorOpt, partitionPath);
HoodieFileReader reader = HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.SPARK)
.getFileReader(new HoodieReaderConfig(), filePath,
HoodieFileFormat.LANCE);
- ClosableIterator<String> keyIterator = reader.getRecordKeyIterator();
- return new ClosableIterator<HoodieKey>() {
- @Override
- public void close() {
- keyIterator.close();
- }
-
- @Override
- public boolean hasNext() {
- return keyIterator.hasNext();
- }
-
- @Override
- public HoodieKey next() {
- String key = keyIterator.next();
- return new HoodieKey(key, partitionPath.orElse(null));
- }
- };
+ ClosableIterator<HoodieRecord> recordIterator =
reader.getRecordIterator(keySchema);
+
+ return new CloseableMappingIterator<>(
+ recordIterator,
+ record -> {
+ String recordKey;
+ if (keyGeneratorOpt.isPresent()) {
+ // With keyGenerator, extracts user-defined key fields by name
+ recordKey = record.getRecordKey(keySchema, keyGeneratorOpt);
+ } else {
+ // Without keyGenerator, read record key field by name
+ recordKey = record.getRecordKey(keySchema,
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+ }
+ // Extract partition path from record if not provided as parameter
+ String partitionPathValue = partitionPath.orElseGet(() ->
+ (String) record.getColumnValueAsJava(keySchema,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+ CollectionUtils.emptyProps()));
+ return new HoodieKey(recordKey, partitionPathValue);
+ }
+ );
} catch (IOException e) {
throw new HoodieIOException("Failed to read from Lance file" + filePath,
e);
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 0c3897144506..3d9b908c4d31 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -114,14 +114,17 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
if (!metaClient.isMetadataTable) {
val updatedOptions: Map[String, String] = options +
(FileFormat.OPTION_RETURNING_BATCH -> "false") // disable vectorized reading
for MOR
if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled) {
- sc.broadcast(new MultipleColumnarFileFormatReader(
- sparkAdapter.createParquetFileReader(vectorized = false, sqlConf,
updatedOptions, config),
- sparkAdapter.createOrcFileReader(vectorized = false, sqlConf,
updatedOptions, config, tableSchema.structTypeSchema)
- ))
+ val parquetReader = sparkAdapter.createParquetFileReader(vectorized =
false, sqlConf, updatedOptions, config)
+ val orcReader = sparkAdapter.createOrcFileReader(vectorized = false,
sqlConf, updatedOptions, config, tableSchema.structTypeSchema)
+ val lanceReader = sparkAdapter.createLanceFileReader(vectorized =
false, sqlConf, updatedOptions, config).orNull
+ val multiReader = new MultipleColumnarFileFormatReader(parquetReader,
orcReader, lanceReader)
+ sc.broadcast(multiReader)
} else if (metaClient.getTableConfig.getBaseFileFormat ==
HoodieFileFormat.PARQUET) {
sc.broadcast(sparkAdapter.createParquetFileReader(vectorized = false,
sqlConf, updatedOptions, config))
} else if (metaClient.getTableConfig.getBaseFileFormat ==
HoodieFileFormat.ORC) {
sc.broadcast(sparkAdapter.createOrcFileReader(vectorized = false,
sqlConf, updatedOptions, config, tableSchema.structTypeSchema))
+ } else if (metaClient.getTableConfig.getBaseFileFormat ==
HoodieFileFormat.LANCE) {
+ sc.broadcast(sparkAdapter.createLanceFileReader(vectorized = false,
sqlConf, updatedOptions, config).orNull)
} else {
throw new IllegalArgumentException(s"Unsupported base file format:
${metaClient.getTableConfig.getBaseFileFormat}")
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
index 31eb5243e623..60461295e741 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/lance/SparkLanceReaderBase.scala
@@ -29,7 +29,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.parquet.schema.MessageType
import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
JoinedRow}
+import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.execution.datasources.{PartitionedFile,
SparkColumnarFileReader}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
@@ -125,9 +126,14 @@ class SparkLanceReaderBase(enableVectorizedReader:
Boolean) extends SparkColumna
// No partition columns - return rows directly
iter.asInstanceOf[Iterator[InternalRow]]
} else {
- // Append partition values to each row using JoinedRow
+ // Create UnsafeProjection to convert JoinedRow to UnsafeRow
+ val fullSchema = (requiredSchema.fields ++
partitionSchema.fields).map(f =>
+ AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+ val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema,
fullSchema)
+
+ // Append partition values to each row using JoinedRow, then convert
to UnsafeRow
val joinedRow = new JoinedRow()
- iter.map(row => joinedRow(row, file.partitionValues))
+ iter.map(row => unsafeProjection(joinedRow(row,
file.partitionValues)))
}
} catch {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index a52ce3fd16bc..8d83c9f4223a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -313,15 +313,16 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
dataSchema: StructType,
enableVectorizedRead: Boolean):
SparkColumnarFileReader = {
if (isMultipleBaseFileFormatsEnabled) {
- new MultipleColumnarFileFormatReader(
- sparkAdapter.createParquetFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration),
- sparkAdapter.createOrcFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration, dataSchema))
+ val parquetReader =
sparkAdapter.createParquetFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration)
+ val orcReader = sparkAdapter.createOrcFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration, dataSchema)
+ val lanceReader =
sparkAdapter.createLanceFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration).orNull
+ new MultipleColumnarFileFormatReader(parquetReader, orcReader,
lanceReader)
} else if (hoodieFileFormat == HoodieFileFormat.PARQUET) {
sparkAdapter.createParquetFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration)
} else if (hoodieFileFormat == HoodieFileFormat.ORC) {
sparkAdapter.createOrcFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration, dataSchema)
} else if (hoodieFileFormat == HoodieFileFormat.LANCE) {
- sparkAdapter.createLanceFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration)
+ sparkAdapter.createLanceFileReader(enableVectorizedRead,
spark.sessionState.conf, options, configuration).orNull
} else {
throw new HoodieNotSupportedException("Unsupported file format: " +
hoodieFileFormat)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 9a527b693c6e..22d23171be75 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.common.table.read
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSchemaConversionUtils, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD,
TABLE_TYPE}
import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode,
TypedProperties}
import org.apache.hudi.common.engine.HoodieReaderContext
@@ -110,7 +110,8 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
val parquetReader = sparkAdapter.createParquetFileReader(vectorized =
false, spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]))
val dataSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
val orcReader = sparkAdapter.createOrcFileReader(vectorized = false,
spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]), dataSchema)
- val multiFormatReader = new
MultipleColumnarFileFormatReader(parquetReader, orcReader)
+ val lanceReader = sparkAdapter.createLanceFileReader(vectorized = false,
spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration])).orNull
+ val multiFormatReader = new
MultipleColumnarFileFormatReader(parquetReader, orcReader, lanceReader)
new SparkFileFormatInternalRowReaderContext(multiFormatReader, Seq.empty,
Seq.empty, getStorageConf, metaClient.getTableConfig)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index c9e1083bb550..8dabe5bfdacf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -396,30 +396,33 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
// Initial insert - 3 records
val records1 = Seq(
- (1, "Alice", 30, 95.5),
- (2, "Bob", 25, 87.3),
- (3, "Charlie", 35, 92.1)
+ (1, "Alice", 30, 95.5, "engineering"),
+ (2, "Bob", 25, 87.3, "sales"),
+ (3, "Charlie", 35, 92.1, "engineering")
)
- val df1 = createDataFrame(records1)
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score", "department")
- writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite, operation = Some("insert"))
+ writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite, operation = Some("insert"),
+ extraOptions = Map(PARTITIONPATH_FIELD.key() -> "department"))
// Upsert - modify Bob's record (id=2)
val records2 = Seq(
- (2, "Bob", 40, 95.0) // Update Bob: age 25->40, score 87.3->95.0
+ (2, "Bob", 40, 95.0, "sales") // Update Bob: age 25->40, score
87.3->95.0
)
- val df2 = createDataFrame(records2)
+ val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score", "department")
- writeDataframe(tableType, tableName, tablePath, df2, operation =
Some("upsert"))
+ writeDataframe(tableType, tableName, tablePath, df2, operation =
Some("upsert"),
+ extraOptions = Map(PARTITIONPATH_FIELD.key() -> "department"))
// Second upsert - modify Alice (id=1) and insert David (id=4)
val records3 = Seq(
- (1, "Alice", 45, 98.5), // Update Alice: age 30->45, score 95.5->98.5
- (4, "David", 28, 88.0) // Insert new record
+ (1, "Alice", 45, 98.5, "engineering"), // Update Alice: age 30->45,
score 95.5->98.5
+ (4, "David", 28, 88.0, "marketing") // Insert new record
)
- val df3 = createDataFrame(records3)
+ val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score", "department")
- writeDataframe(tableType, tableName, tablePath, df3, operation =
Some("upsert"))
+ writeDataframe(tableType, tableName, tablePath, df3, operation =
Some("upsert"),
+ extraOptions = Map(PARTITIONPATH_FIELD.key() -> "department"))
// Validate commits
val metaClient = HoodieTableMetaClient.builder()
@@ -440,14 +443,14 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
// Read and verify data
val readDf = spark.read.format("hudi").load(tablePath)
- val actual = readDf.select("id", "name", "age", "score")
+ val actual = readDf.select("id", "name", "age", "score", "department")
- val expectedDf = createDataFrame(Seq(
- (1, "Alice", 45, 98.5),
- (2, "Bob", 40, 95.0),
- (3, "Charlie", 35, 92.1),
- (4, "David", 28, 88.0)
- ))
+ val expectedDf = spark.createDataFrame(Seq(
+ (1, "Alice", 45, 98.5, "engineering"),
+ (2, "Bob", 40, 95.0, "sales"),
+ (3, "Charlie", 35, 92.1, "engineering"),
+ (4, "David", 28, 88.0, "marketing")
+ )).toDF("id", "name", "age", "score", "department")
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
@@ -455,25 +458,26 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
if (tableType == HoodieTableType.MERGE_ON_READ) {
// Write one more commit to trigger compaction
val records4 = Seq(
- (1, "Alice", 50, 98.5), // Update Alice: age 45->50
- (4, "David", 28, 90.0) // Update David: score 88.0->90.0
+ (1, "Alice", 50, 98.5, "engineering"), // Update Alice: age 45->50
+ (4, "David", 28, 90.0, "marketing") // Update David: score 88.0->90.0
)
- val df4 = createDataFrame(records4)
+ val df4 = spark.createDataFrame(records4).toDF("id", "name", "age",
"score", "department")
writeDataframe(tableType, tableName, tablePath, df4, operation =
Some("upsert"),
- extraOptions = Map("hoodie.compact.inline" -> "true",
"hoodie.compact.inline.max.delta.commits" -> "1"))
- val expectedDfAfterCompaction = createDataFrame(Seq(
- (1, "Alice", 50, 98.5),
- (2, "Bob", 40, 95.0),
- (3, "Charlie", 35, 92.1),
- (4, "David", 28, 90.0)
- ))
+ extraOptions = Map("hoodie.compact.inline" -> "true",
"hoodie.compact.inline.max.delta.commits" -> "1",
+ PARTITIONPATH_FIELD.key() -> "department"))
+ val expectedDfAfterCompaction = spark.createDataFrame(Seq(
+ (1, "Alice", 50, 98.5, "engineering"),
+ (2, "Bob", 40, 95.0, "sales"),
+ (3, "Charlie", 35, 92.1, "engineering"),
+ (4, "David", 28, 90.0, "marketing")
+ )).toDF("id", "name", "age", "score", "department")
// validate compaction commit is present
val compactionCommits =
metaClient.reloadActiveTimeline().filterCompletedInstants().getInstants.asScala
.filter(instant => instant.getAction == "commit")
assertTrue(compactionCommits.nonEmpty, "Compaction commit should be
present after upsert")
// Read and verify data after compaction
val readDfAfterCompaction = spark.read.format("hudi").load(tablePath)
- val actualAfterCompaction = readDfAfterCompaction.select("id", "name",
"age", "score")
+ val actualAfterCompaction = readDfAfterCompaction.select("id", "name",
"age", "score", "department")
assertTrue(expectedDfAfterCompaction.except(actualAfterCompaction).isEmpty)
assertTrue(actualAfterCompaction.except(expectedDfAfterCompaction).isEmpty)
}
@@ -659,6 +663,60 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
assertTrue(actual.except(expectedDf).isEmpty)
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testUpsertWithPopulateMetaFieldsDisabled(tableType: HoodieTableType):
Unit = {
+ val tableName = s"test_lance_no_meta_${tableType.name().toLowerCase}"
+ val tablePath = s"$basePath/$tableName"
+
+ val records1 = Seq(
+ (101, "Alice", 30, 95.5, "engineering"),
+ (102, "Bob", 25, 87.3, "sales"),
+ (103, "Charlie", 35, 92.1, "engineering")
+ )
+ val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score", "department")
+
+ // Write with populateMetaFields=false
+ writeDataframe(tableType, tableName, tablePath, df1,
+ saveMode = SaveMode.Overwrite, operation = Some("insert"),
+ extraOptions = Map("hoodie.populate.meta.fields" -> "false",
PARTITIONPATH_FIELD.key() -> "department"))
+
+ // Upsert - modify Bob's record
+ val records2 = Seq(
+ (102, "Bob", 40, 95.0, "sales")
+ )
+ val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score", "department")
+
+ writeDataframe(tableType, tableName, tablePath, df2,
+ operation = Some("upsert"),
+ extraOptions = Map("hoodie.populate.meta.fields" -> "false",
PARTITIONPATH_FIELD.key() -> "department"))
+
+ // Second upsert - modify Alice and insert David
+ val records3 = Seq(
+ (101, "Alice", 45, 98.5, "engineering"),
+ (104, "David", 28, 88.0, "marketing")
+ )
+ val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score", "department")
+
+ writeDataframe(tableType, tableName, tablePath, df3,
+ operation = Some("upsert"),
+ extraOptions = Map("hoodie.populate.meta.fields" -> "false",
PARTITIONPATH_FIELD.key() -> "department"))
+
+ // Verify data
+ val readDf = spark.read.format("hudi").load(tablePath)
+ val actual = readDf.select("id", "name", "age", "score", "department")
+
+ val expectedDf = spark.createDataFrame(Seq(
+ (101, "Alice", 45, 98.5, "engineering"),
+ (102, "Bob", 40, 95.0, "sales"),
+ (103, "Charlie", 35, 92.1, "engineering"),
+ (104, "David", 28, 88.0, "marketing")
+ )).toDF("id", "name", "age", "score", "department")
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = {
spark.createDataFrame(records).toDF("id", "name", "age",
"score").coalesce(1)
}
diff --git
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 4ac9f2875fd7..a180ab3483dc 100644
---
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -157,8 +157,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
override def createLanceFileReader(vectorized: Boolean,
sqlConf: SQLConf,
options: Map[String, String],
- hadoopConf: Configuration):
SparkColumnarFileReader = {
- throw new UnsupportedOperationException("Lance format is not supported in
Spark 3.3")
+ hadoopConf: Configuration):
Option[SparkColumnarFileReader] = {
+ None
}
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{
diff --git
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 22dfce461d67..32b21b936f09 100644
---
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -158,8 +158,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
override def createLanceFileReader(vectorized: Boolean,
sqlConf: SQLConf,
options: Map[String, String],
- hadoopConf: Configuration):
SparkColumnarFileReader = {
- new SparkLanceReaderBase(vectorized)
+ hadoopConf: Configuration):
Option[SparkColumnarFileReader] = {
+ Some(new SparkLanceReaderBase(vectorized))
}
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index 2db62f2fca8f..24492fdc358b 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -174,8 +174,8 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
override def createLanceFileReader(vectorized: Boolean,
sqlConf: SQLConf,
options: Map[String, String],
- hadoopConf: Configuration):
SparkColumnarFileReader = {
- new SparkLanceReaderBase(vectorized)
+ hadoopConf: Configuration):
Option[SparkColumnarFileReader] = {
+ Some(new SparkLanceReaderBase(vectorized))
}
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index cbf0b2de8bb4..60584b6aba39 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -172,8 +172,8 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
override def createLanceFileReader(vectorized: Boolean,
sqlConf: SQLConf,
options: Map[String, String],
- hadoopConf: Configuration):
SparkColumnarFileReader = {
- new SparkLanceReaderBase(vectorized)
+ hadoopConf: Configuration):
Option[SparkColumnarFileReader] = {
+ Some(new SparkLanceReaderBase(vectorized))
}
override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit =
{