This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d29284788a6c feat: Ensure MOR table works, with lance base files and 
avro logs file (#17768)
d29284788a6c is described below

commit d29284788a6cae70b9fd49eee9e2affb31a82147
Author: Rahil C <[email protected]>
AuthorDate: Fri Jan 16 13:20:55 2026 -0800

    feat: Ensure MOR table works, with lance base files and avro logs file 
(#17768)
    
    * Ensure MOR table works, with lance base files and avro logs file
    
    * fix style
    
    * minor
    
    * version downgrade in lance spark and lance core due to arrow issue for MOR
    
    * retrigger ci
    
    * cleanup
    
    * add compaction validation
    
    * refactor test to reduce code duplication, add clustering validation, fix 
writer bug
    
    ---------
    
    Co-authored-by: Timothy Brown <[email protected]>
---
 .../org/apache/hudi/util/CommonClientUtils.java    |   1 +
 .../hudi/common/model/HoodieSparkRecord.java       |  11 +-
 .../hudi/io/storage/HoodieSparkLanceWriter.java    |  15 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   3 -
 .../hudi/functional/TestLanceDataSource.scala      | 498 +++++++++------------
 5 files changed, 238 insertions(+), 290 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
index 418d603c86e4..cd95b7ba5e13 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
@@ -108,6 +108,7 @@ public class CommonClientUtils {
     }
     HoodieFileFormat baseFileFormat = getBaseFileFormat(writeConfig, 
tableConfig);
     switch (getBaseFileFormat(writeConfig, tableConfig)) {
+      case LANCE:
       case PARQUET:
       case ORC:
         return HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index edcefda21224..0f4a9a31924c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.SparkFileFormatInternalRecordContext;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.model.HoodieInternalRow;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaType;
@@ -327,7 +328,15 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, 
Properties props) throws IOException {
-    throw new UnsupportedOperationException();
+    // Convert Spark InternalRow to Avro GenericRecord
+    if (data == null) {
+      throw new IOException("Cannot convert null data to Avro bytes");
+    }
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    GenericRecord avroRecord = AvroConversionUtils
+        .createInternalRowToAvroConverter(structType, 
recordSchema.toAvroSchema(), false)
+        .apply(data);
+    return HoodieAvroUtils.avroToBytesStream(avroRecord);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index 67e07d162ec1..99bd3916fe92 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.io.storage;
 
-import com.lancedb.lance.spark.arrow.LanceArrowWriter;
-import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -27,6 +25,9 @@ import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
 import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+
+import com.lancedb.lance.spark.arrow.LanceArrowWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.LanceArrowUtils;
@@ -113,26 +114,26 @@ public class HoodieSparkLanceWriter extends 
HoodieBaseLanceWriter<InternalRow>
     if (populateMetaFields) {
       UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
       updateRecordMetadata(row, recordKey, key.getPartitionPath(), 
getWrittenRecordCount());
-      super.write(row);
+      super.write(row.copy());
     } else {
-      super.write(row);
+      super.write(row.copy());
     }
   }
 
   @Override
   public void writeRow(String recordKey, InternalRow row) throws IOException {
-    super.write(row);
+    super.write(row.copy());
   }
   
   @Override
   public void writeRow(UTF8String key, InternalRow row) throws IOException {
     // Key reserved for future bloom filter support 
(https://github.com/apache/hudi/issues/17664)
-    super.write(row);
+    super.write(row.copy());
   }
   
   @Override
   public void writeRow(InternalRow row) throws IOException {
-    super.write(row);
+    super.write(row.copy());
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 38b22715f4c4..0612dae85b26 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -516,9 +516,6 @@ class HoodieSparkSqlWriterInternal {
             // scalastyle:on
 
             val writeConfig = client.getConfig
-            if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK && tableType == MERGE_ON_READ && 
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
-              throw new 
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} 
only support parquet log.")
-            }
             instantTime = client.startCommit(commitActionType)
             // if table has undergone upgrade, we need to reload table config
             tableMetaClient.reloadTableConfig()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 9a634b8eec6d..c9e1083bb550 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -19,15 +19,18 @@ package org.apache.hudi.functional
 
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.DefaultSparkRecordMerger
+import org.apache.hudi.common.model.HoodieTableType
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
 
-import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.condition.DisabledIfSystemProperty
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
 
 import scala.collection.JavaConverters._
 
@@ -51,9 +54,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
     spark = null
   }
 
-  @Test
-  def testBasicWriteAndRead(): Unit = {
-    val tableName = "test_lance_table"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testBasicWriteAndRead(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_table_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // Create test data
@@ -62,19 +66,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase 
{
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1)
     )
-    val expectedDf = spark.createDataFrame(records).toDF("id", "name", "age", 
"score")
+    val expectedDf = createDataFrame(records)
 
     // Write to Hudi table with Lance base file format
-    expectedDf.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, expectedDf, saveMode = 
SaveMode.Overwrite)
 
     // Read back and verify
     val readDf = spark.read
@@ -87,9 +82,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
     assertTrue(actual.except(expectedDf).isEmpty)
   }
 
-  @Test
-  def testSchemaProjection(): Unit = {
-    val tableName = "test_lance_projection"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testSchemaProjection(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_projection_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // Create test data with multiple columns
@@ -101,16 +97,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase 
{
     val inputDf = spark.createDataFrame(records).toDF("id", "name", "age", 
"score", "department")
 
     // Write to Hudi table with Lance format
-    inputDf.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, inputDf, saveMode = 
SaveMode.Overwrite)
 
     // Read with schema projection - only select subset of columns
     val readDf = spark.read
@@ -133,9 +120,10 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     assertTrue(actual.except(expectedDf).isEmpty)
   }
 
-  @Test
-  def testWhereClauseFiltering(): Unit = {
-    val tableName = "test_lance_where"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testWhereClauseFiltering(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_where_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // Create test data
@@ -146,19 +134,10 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (4, "David", 28, 88.9),
       (5, "Eve", 32, 91.4)
     )
-    val df = spark.createDataFrame(records).toDF("id", "name", "age", "score")
+    val df = createDataFrame(records)
 
     // Write to Hudi table with Lance format
-    df.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df, saveMode = 
SaveMode.Overwrite)
 
     // Test 1: Simple WHERE clause on numeric column
     val filteredByAge = spark.read
@@ -167,10 +146,10 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       .where("age > 30")
       .select("id", "name", "age", "score")
 
-    val expectedFilteredByAge = spark.createDataFrame(Seq(
+    val expectedFilteredByAge = createDataFrame(Seq(
       (3, "Charlie", 35, 92.1),
       (5, "Eve", 32, 91.4)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedFilteredByAge.except(filteredByAge).isEmpty)
     assertTrue(filteredByAge.except(expectedFilteredByAge).isEmpty)
@@ -182,9 +161,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase 
{
       .where("name = 'Bob'")
       .select("id", "name", "age", "score")
 
-    val expectedFilteredByName = spark.createDataFrame(Seq(
-      (2, "Bob", 25, 87.3)
-    )).toDF("id", "name", "age", "score")
+    val expectedFilteredByName = createDataFrame(Seq((2, "Bob", 25, 87.3)))
 
     assertTrue(expectedFilteredByName.except(filteredByName).isEmpty)
     assertTrue(filteredByName.except(expectedFilteredByName).isEmpty)
@@ -196,19 +173,20 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       .where("age >= 28 AND score > 90")
       .select("id", "name", "age", "score")
 
-    val expectedFilteredComplex = spark.createDataFrame(Seq(
+    val expectedFilteredComplex = createDataFrame(Seq(
       (1, "Alice", 30, 95.5),
       (3, "Charlie", 35, 92.1),
       (5, "Eve", 32, 91.4)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedFilteredComplex.except(filteredComplex).isEmpty)
     assertTrue(filteredComplex.except(expectedFilteredComplex).isEmpty)
   }
 
-  @Test
-  def testMultipleBulkInsertsWithCommitValidation(): Unit = {
-    val tableName = "test_lance_multiple_inserts"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testMultipleBulkInsertsWithCommitValidation(tableType: HoodieTableType): 
Unit = {
+    val tableName = 
s"test_lance_multiple_inserts_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // First insert - records 1-3
@@ -217,19 +195,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1)
     )
-    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+    val df1 = createDataFrame(records1)
 
-    df1.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df1, operation = 
Some("bulk_insert"))
 
     // Second insert - records 4-6
     val records2 = Seq(
@@ -237,19 +205,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (5, "Eve", 32, 91.4),
       (6, "Frank", 27, 85.7)
     )
-    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+    val df2 = createDataFrame(records2)
 
-    df2.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df2, operation = 
Some("bulk_insert"))
 
     // Third insert - records 7-9
     val records3 = Seq(
@@ -257,19 +215,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (8, "Henry", 31, 89.6),
       (9, "Iris", 26, 94.8)
     )
-    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+    val df3 = createDataFrame(records3)
 
-    df3.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df3, operation = 
Some("bulk_insert"))
 
     // Validate number of commits matches number of inserts
     val metaClient = HoodieTableMetaClient.builder()
@@ -284,11 +232,6 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     val commits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
     assertEquals(3, commits.size, "Should have exactly 3 commits")
 
-    // Check that each commit is a COMMIT action (bulk_insert creates COMMIT 
actions)
-    commits.foreach { instant =>
-      assertEquals("commit", instant.getAction, s"Instant 
${instant.requestedTime()} should be a commit action")
-    }
-
     // Read back all data and verify total record count
     val readDf = spark.read
       .format("hudi")
@@ -296,7 +239,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase 
{
 
     val actual = readDf.select("id", "name", "age", "score")
 
-    val expectedDf = spark.createDataFrame(Seq(
+    val expectedDf = createDataFrame(Seq(
       (1, "Alice", 30, 95.5),
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1),
@@ -306,15 +249,16 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (7, "Grace", 29, 93.2),
       (8, "Henry", 31, 89.6),
       (9, "Iris", 26, 94.8)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
   }
 
-  @Test
-  def testTimeTravel(): Unit = {
-    val tableName = "test_lance_time_travel"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testTimeTravel(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_time_travel_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // First insert - records 1-3
@@ -323,19 +267,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1)
     )
-    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+    val df1 = createDataFrame(records1)
 
-    df1.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite)
 
     // Second insert - records 4-6
     val records2 = Seq(
@@ -343,19 +277,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (5, "Eve", 32, 91.4),
       (6, "Frank", 27, 85.7)
     )
-    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+    val df2 = createDataFrame(records2)
 
-    df2.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df2)
 
     // Get the commit timestamp after second insert
     val metaClient = HoodieTableMetaClient.builder()
@@ -373,19 +297,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (8, "Henry", 31, 89.6),
       (9, "Iris", 26, 94.8)
     )
-    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+    val df3 = createDataFrame(records3)
 
-    df3.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df3)
 
     // Time travel query to second commit (should see data from c1 + c2 only)
     val timeTravelDf = spark.read
@@ -395,22 +309,23 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
 
     val actual = timeTravelDf.select("id", "name", "age", "score")
 
-    val expectedDf = spark.createDataFrame(Seq(
+    val expectedDf = createDataFrame(Seq(
       (1, "Alice", 30, 95.5),
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1),
       (4, "David", 28, 88.9),
       (5, "Eve", 32, 91.4),
       (6, "Frank", 27, 85.7)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
   }
 
-  @Test
-  def testMultipleRegularInsertsWithCommitValidation(): Unit = {
-    val tableName = "test_lance_regular_inserts"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testMultipleRegularInsertsWithCommitValidation(tableType: 
HoodieTableType): Unit = {
+    val tableName = 
s"test_lance_regular_inserts_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // First insert - records 1-3 using regular insert
@@ -419,19 +334,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1)
     )
-    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+    val df1 = createDataFrame(records1)
 
-    df1.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "insert")
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite, operation = Some("insert"))
 
     // Second insert - records 4-6 using regular insert
     val records2 = Seq(
@@ -439,19 +344,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (5, "Eve", 32, 91.4),
       (6, "Frank", 27, 85.7)
     )
-    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+    val df2 = createDataFrame(records2)
 
-    df2.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "insert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df2, operation = 
Some("insert"))
 
     // Validate number of commits matches number of inserts
     val metaClient = HoodieTableMetaClient.builder()
@@ -466,9 +361,11 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     val commits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
     assertEquals(2, commits.size, "Should have exactly 2 commits")
 
-    // Check that each commit is a COMMIT action (insert creates COMMIT 
actions)
+    // Verify commit action types based on table type
+    val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) 
"commit" else "deltacommit"
     commits.foreach { instant =>
-      assertEquals("commit", instant.getAction, s"Instant 
${instant.requestedTime()} should be a commit action")
+      assertEquals(expectedAction, instant.getAction,
+        s"Instant ${instant.requestedTime()} should be a $expectedAction 
action for $tableType table")
     }
 
     // Read back all data and verify total record count
@@ -478,22 +375,23 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
 
     val actual = readDf.select("id", "name", "age", "score")
 
-    val expectedDf = spark.createDataFrame(Seq(
+    val expectedDf = createDataFrame(Seq(
       (1, "Alice", 30, 95.5),
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1),
       (4, "David", 28, 88.9),
       (5, "Eve", 32, 91.4),
       (6, "Frank", 27, 85.7)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
   }
 
-  @Test
-  def testBasicUpsertModifyExistingRow(): Unit = {
-    val tableName = "test_lance_upsert"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testBasicUpsertModifyExistingRow(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_upsert_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // Initial insert - 3 records
@@ -502,56 +400,26 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1)
     )
-    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+    val df1 = createDataFrame(records1)
 
-    df1.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "insert")
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite, operation = Some("insert"))
 
     // Upsert - modify Bob's record (id=2)
     val records2 = Seq(
       (2, "Bob", 40, 95.0)  // Update Bob: age 25->40, score 87.3->95.0
     )
-    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+    val df2 = createDataFrame(records2)
 
-    df2.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "upsert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df2, operation = 
Some("upsert"))
 
     // Second upsert - modify Alice (id=1) and insert David (id=4)
     val records3 = Seq(
       (1, "Alice", 45, 98.5),  // Update Alice: age 30->45, score 95.5->98.5
       (4, "David", 28, 88.0)   // Insert new record
     )
-    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+    val df3 = createDataFrame(records3)
 
-    df3.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "upsert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df3, operation = 
Some("upsert"))
 
     // Validate commits
     val metaClient = HoodieTableMetaClient.builder()
@@ -562,24 +430,59 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     val commitCount = 
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
     assertEquals(3, commitCount, "Should have 3 completed commits (insert + 2 
upserts)")
 
+    // Verify commit action types based on table type
+    val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) 
"commit" else "deltacommit"
+    val commits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala
+    commits.foreach { instant =>
+      assertEquals(expectedAction, instant.getAction,
+        s"Instant ${instant.requestedTime()} should be a $expectedAction 
action for $tableType table")
+    }
+
     // Read and verify data
     val readDf = spark.read.format("hudi").load(tablePath)
     val actual = readDf.select("id", "name", "age", "score")
 
-    val expectedDf = spark.createDataFrame(Seq(
+    val expectedDf = createDataFrame(Seq(
       (1, "Alice", 45, 98.5),
       (2, "Bob", 40, 95.0),
       (3, "Charlie", 35, 92.1),
       (4, "David", 28, 88.0)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
+
+    if (tableType == HoodieTableType.MERGE_ON_READ) {
+      // Write one more commit to trigger compaction
+      val records4 = Seq(
+        (1, "Alice", 50, 98.5),  // Update Alice: age 45->50
+        (4, "David", 28, 90.0)   // Update David: score 88.0->90.0
+      )
+      val df4 = createDataFrame(records4)
+      writeDataframe(tableType, tableName, tablePath, df4, operation = 
Some("upsert"),
+        extraOptions = Map("hoodie.compact.inline" -> "true", 
"hoodie.compact.inline.max.delta.commits" -> "1"))
+      val expectedDfAfterCompaction = createDataFrame(Seq(
+        (1, "Alice", 50, 98.5),
+        (2, "Bob", 40, 95.0),
+        (3, "Charlie", 35, 92.1),
+        (4, "David", 28, 90.0)
+      ))
+      // validate compaction commit is present
+      val compactionCommits = 
metaClient.reloadActiveTimeline().filterCompletedInstants().getInstants.asScala
+        .filter(instant => instant.getAction == "commit")
+      assertTrue(compactionCommits.nonEmpty, "Compaction commit should be 
present after upsert")
+      // Read and verify data after compaction
+      val readDfAfterCompaction = spark.read.format("hudi").load(tablePath)
+      val actualAfterCompaction = readDfAfterCompaction.select("id", "name", 
"age", "score")
+      
assertTrue(expectedDfAfterCompaction.except(actualAfterCompaction).isEmpty)
+      
assertTrue(actualAfterCompaction.except(expectedDfAfterCompaction).isEmpty)
+    }
   }
 
-  @Test
-  def testBasicDeleteOperation(): Unit = {
-    val tableName = "test_lance_delete"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testBasicDeleteOperation(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_delete_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // Initial insert - 5 records
@@ -590,19 +493,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (4, "David", 28, 88.0),
       (5, "Eve", 32, 91.4)
     )
-    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+    val df1 = createDataFrame(records1)
 
-    df1.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "insert")
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite, operation = Some("insert"))
 
     // Delete operation - delete Bob (id=2), David (id=4), and a non-existent 
key (id=99)
     val recordsToDelete = Seq(
@@ -610,19 +503,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (4, "David", 28, 88.0),      // Delete David (exists)
       (99, "NonExistent", 50, 0.0) // Delete non-existent record (should be 
no-op)
     )
-    val deleteDF = spark.createDataFrame(recordsToDelete).toDF("id", "name", 
"age", "score")
+    val deleteDF = createDataFrame(recordsToDelete)
 
-    deleteDF.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "delete")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, deleteDF, operation = 
Some("delete"))
 
     // Validate commits
     val metaClient = HoodieTableMetaClient.builder()
@@ -633,23 +516,32 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     val commitCount = 
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
     assertEquals(2, commitCount, "Should have 2 completed commits (insert + 
delete)")
 
+    // Verify commit action types based on table type
+    val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE) 
"commit" else "deltacommit"
+    val commits = 
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala
+    commits.foreach { instant =>
+      assertEquals(expectedAction, instant.getAction,
+        s"Instant ${instant.requestedTime()} should be a $expectedAction 
action for $tableType table")
+    }
+
     // Read and verify data
     val readDf = spark.read.format("hudi").load(tablePath)
     val actual = readDf.select("id", "name", "age", "score")
 
-    val expectedDf = spark.createDataFrame(Seq(
+    val expectedDf = createDataFrame(Seq(
       (1, "Alice", 30, 95.5),
       (3, "Charlie", 35, 92.1),
       (5, "Eve", 32, 91.4)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
   }
 
-  @Test
-  def testIncrementalQuery(): Unit = {
-    val tableName = "test_lance_incremental"
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testIncrementalQuery(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_incremental_${tableType.name().toLowerCase}"
     val tablePath = s"$basePath/$tableName"
 
     // First insert - records 1-3
@@ -658,19 +550,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (2, "Bob", 25, 87.3),
       (3, "Charlie", 35, 92.1)
     )
-    val df1 = spark.createDataFrame(records1).toDF("id", "name", "age", 
"score")
+    val df1 = createDataFrame(records1)
 
-    df1.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Overwrite)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite)
 
     // Second insert - records 4-6
     val records2 = Seq(
@@ -678,19 +560,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (5, "Eve", 32, 91.4),
       (6, "Frank", 27, 85.7)
     )
-    val df2 = spark.createDataFrame(records2).toDF("id", "name", "age", 
"score")
+    val df2 = createDataFrame(records2)
 
-    df2.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df2)
 
     // Get commit timestamps
     val metaClient = HoodieTableMetaClient.builder()
@@ -708,19 +580,9 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
       (8, "Henry", 31, 89.6),
       (9, "Iris", 26, 94.8)
     )
-    val df3 = spark.createDataFrame(records3).toDF("id", "name", "age", 
"score")
+    val df3 = createDataFrame(records3)
 
-    df3.write
-      .format("hudi")
-      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
-      .option(RECORDKEY_FIELD.key(), "id")
-      .option(PRECOMBINE_FIELD.key(), "age")
-      .option(TABLE_NAME.key(), tableName)
-      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
-      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
-      .option(OPERATION.key(), "bulk_insert")
-      .mode(SaveMode.Append)
-      .save(tablePath)
+    writeDataframe(tableType, tableName, tablePath, df3)
 
     // Reload metaClient to get latest commits
     metaClient.reloadActiveTimeline()
@@ -738,13 +600,91 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
 
     val actual = incrementalDf.select("id", "name", "age", "score")
 
-    val expectedDf = spark.createDataFrame(Seq(
+    val expectedDf = createDataFrame(Seq(
       (7, "Grace", 29, 93.2),
       (8, "Henry", 31, 89.6),
       (9, "Iris", 26, 94.8)
-    )).toDF("id", "name", "age", "score")
+    ))
 
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
   }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testClustering(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_clustering_${tableType.name().toLowerCase}"
+    val tablePath = s"$basePath/$tableName"
+
+    // Initial insert - 5 records
+    val records1 = Seq(
+      (1, "Alice", 30, 95.5),
+      (2, "Bob", 25, 87.3),
+      (3, "Charlie", 35, 92.1),
+      (4, "David", 28, 88.0),
+      (5, "Eve", 32, 91.4)
+    )
+    val df1 = createDataFrame(records1)
+
+    writeDataframe(tableType, tableName, tablePath, df1, saveMode = 
SaveMode.Overwrite, operation = Some("bulk_insert"))
+
+    // Second insert - 5 more records
+    val records2 = Seq(
+      (6, "Frank", 27, 85.7),
+      (7, "Grace", 29, 93.2),
+      (8, "Henry", 31, 89.6),
+      (9, "Iris", 26, 94.8),
+      (10, "Jack", 33, 90.5)
+    )
+    val df2 = createDataFrame(records2)
+    writeDataframe(tableType, tableName, tablePath, df2, operation = 
Some("bulk_insert"), extraOptions = Map(
+      "hoodie.clustering.inline" -> "true",
+      "hoodie.clustering.inline.max.commits" -> "1"
+    ))
+
+    // Validate that clustering commit is present
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .setBasePath(tablePath)
+      .build()
+    
assertTrue(metaClient.getActiveTimeline.getLastClusteringInstant.isPresent, 
"Clustering commit should be present after inline clustering")
+
+    // Read and verify data
+    val readDf = spark.read.format("hudi").load(tablePath)
+    val actual = readDf.select("id", "name", "age", "score")
+
+    val expectedDf = createDataFrame(records1 ++ records2)
+
+    assertTrue(expectedDf.except(actual).isEmpty)
+    assertTrue(actual.except(expectedDf).isEmpty)
+  }
+
+  private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = {
+    spark.createDataFrame(records).toDF("id", "name", "age", 
"score").coalesce(1)
+  }
+
+  private def writeDataframe(tableType: HoodieTableType, tableName: String, 
tablePath: String, df: DataFrame,
+                             saveMode: SaveMode = SaveMode.Append, operation: 
Option[String] = None,
+                             extraOptions: Map[String, String] = Map.empty): 
Unit = {
+    var writer = df.write
+      .format("hudi")
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+      .option(TABLE_TYPE.key(), tableType.name())
+      .option(RECORDKEY_FIELD.key(), "id")
+      .option(PRECOMBINE_FIELD.key(), "age")
+      .option(TABLE_NAME.key(), tableName)
+      .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+      .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(), 
classOf[DefaultSparkRecordMerger].getName)
+
+    // Add operation if specified
+    writer = operation match {
+      case Some(op) => writer.option(OPERATION.key(), op)
+      case None => writer
+    }
+
+    // Add any extra options
+    extraOptions.foreach { case (key, value) => writer = writer.option(key, 
value) }
+
+    writer.mode(saveMode).save(tablePath)
+  }
 }


Reply via email to