This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d29284788a6c feat: Ensure MOR table works, with lance base files and
avro logs file (#17768)
d29284788a6c is described below
commit d29284788a6cae70b9fd49eee9e2affb31a82147
Author: Rahil C <[email protected]>
AuthorDate: Fri Jan 16 13:20:55 2026 -0800
feat: Ensure MOR table works, with lance base files and avro logs file
(#17768)
* Ensure MOR table works, with lance base files and avro logs file
* fix style
* minor
* version downgrade in lance spark and lance core due to arrow issue for MOR
* retrigger ci
* cleanup
* add compaction validation
* refactor test to reduce code duplication, add clustering validation, fix
writer bug
---------
Co-authored-by: Timothy Brown <[email protected]>
---
.../org/apache/hudi/util/CommonClientUtils.java | 1 +
.../hudi/common/model/HoodieSparkRecord.java | 11 +-
.../hudi/io/storage/HoodieSparkLanceWriter.java | 15 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 3 -
.../hudi/functional/TestLanceDataSource.scala | 498 +++++++++------------
5 files changed, 238 insertions(+), 290 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
index 418d603c86e4..cd95b7ba5e13 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/CommonClientUtils.java
@@ -108,6 +108,7 @@ public class CommonClientUtils {
}
HoodieFileFormat baseFileFormat = getBaseFileFormat(writeConfig,
tableConfig);
switch (getBaseFileFormat(writeConfig, tableConfig)) {
+ case LANCE:
case PARQUET:
case ORC:
return HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index edcefda21224..0f4a9a31924c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRecordContext;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.model.HoodieInternalRow;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaType;
@@ -327,7 +328,15 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
@Override
public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema,
Properties props) throws IOException {
- throw new UnsupportedOperationException();
+ // Convert Spark InternalRow to Avro GenericRecord
+ if (data == null) {
+ throw new IOException("Cannot convert null data to Avro bytes");
+ }
+ StructType structType =
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+ GenericRecord avroRecord = AvroConversionUtils
+ .createInternalRowToAvroConverter(structType,
recordSchema.toAvroSchema(), false)
+ .apply(data);
+ return HoodieAvroUtils.avroToBytesStream(avroRecord);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
index 67e07d162ec1..99bd3916fe92 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkLanceWriter.java
@@ -18,8 +18,6 @@
package org.apache.hudi.io.storage;
-import com.lancedb.lance.spark.arrow.LanceArrowWriter;
-import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -27,6 +25,9 @@ import org.apache.hudi.io.lance.HoodieBaseLanceWriter;
import org.apache.hudi.io.storage.row.HoodieInternalRowFileWriter;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+
+import com.lancedb.lance.spark.arrow.LanceArrowWriter;
+import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.LanceArrowUtils;
@@ -113,26 +114,26 @@ public class HoodieSparkLanceWriter extends
HoodieBaseLanceWriter<InternalRow>
if (populateMetaFields) {
UTF8String recordKey = UTF8String.fromString(key.getRecordKey());
updateRecordMetadata(row, recordKey, key.getPartitionPath(),
getWrittenRecordCount());
- super.write(row);
+ super.write(row.copy());
} else {
- super.write(row);
+ super.write(row.copy());
}
}
@Override
public void writeRow(String recordKey, InternalRow row) throws IOException {
- super.write(row);
+ super.write(row.copy());
}
@Override
public void writeRow(UTF8String key, InternalRow row) throws IOException {
// Key reserved for future bloom filter support
(https://github.com/apache/hudi/issues/17664)
- super.write(row);
+ super.write(row.copy());
}
@Override
public void writeRow(InternalRow row) throws IOException {
- super.write(row);
+ super.write(row.copy());
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 38b22715f4c4..0612dae85b26 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -516,9 +516,6 @@ class HoodieSparkSqlWriterInternal {
// scalastyle:on
val writeConfig = client.getConfig
- if (writeConfig.getRecordMerger.getRecordType ==
HoodieRecordType.SPARK && tableType == MERGE_ON_READ &&
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) !=
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
- throw new
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName}
only support parquet log.")
- }
instantTime = client.startCommit(commitActionType)
// if table has undergone upgrade, we need to reload table config
tableMetaClient.reloadTableConfig()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 9a634b8eec6d..c9e1083bb550 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -19,15 +19,18 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.DefaultSparkRecordMerger
+import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieSparkClientTestBase
-import org.apache.spark.sql.{SaveMode, SparkSession}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.condition.DisabledIfSystemProperty
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
import scala.collection.JavaConverters._
@@ -51,9 +54,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
spark = null
}
- @Test
- def testBasicWriteAndRead(): Unit = {
- val tableName = "test_lance_table"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBasicWriteAndRead(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_table_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// Create test data
@@ -62,19 +66,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase
{
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1)
)
- val expectedDf = spark.createDataFrame(records).toDF("id", "name", "age",
"score")
+ val expectedDf = createDataFrame(records)
// Write to Hudi table with Lance base file format
- expectedDf.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, expectedDf, saveMode =
SaveMode.Overwrite)
// Read back and verify
val readDf = spark.read
@@ -87,9 +82,10 @@ class TestLanceDataSource extends HoodieSparkClientTestBase {
assertTrue(actual.except(expectedDf).isEmpty)
}
- @Test
- def testSchemaProjection(): Unit = {
- val tableName = "test_lance_projection"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testSchemaProjection(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_projection_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// Create test data with multiple columns
@@ -101,16 +97,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase
{
val inputDf = spark.createDataFrame(records).toDF("id", "name", "age",
"score", "department")
// Write to Hudi table with Lance format
- inputDf.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, inputDf, saveMode =
SaveMode.Overwrite)
// Read with schema projection - only select subset of columns
val readDf = spark.read
@@ -133,9 +120,10 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
assertTrue(actual.except(expectedDf).isEmpty)
}
- @Test
- def testWhereClauseFiltering(): Unit = {
- val tableName = "test_lance_where"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testWhereClauseFiltering(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_where_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// Create test data
@@ -146,19 +134,10 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(4, "David", 28, 88.9),
(5, "Eve", 32, 91.4)
)
- val df = spark.createDataFrame(records).toDF("id", "name", "age", "score")
+ val df = createDataFrame(records)
// Write to Hudi table with Lance format
- df.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df, saveMode =
SaveMode.Overwrite)
// Test 1: Simple WHERE clause on numeric column
val filteredByAge = spark.read
@@ -167,10 +146,10 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
.where("age > 30")
.select("id", "name", "age", "score")
- val expectedFilteredByAge = spark.createDataFrame(Seq(
+ val expectedFilteredByAge = createDataFrame(Seq(
(3, "Charlie", 35, 92.1),
(5, "Eve", 32, 91.4)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedFilteredByAge.except(filteredByAge).isEmpty)
assertTrue(filteredByAge.except(expectedFilteredByAge).isEmpty)
@@ -182,9 +161,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase
{
.where("name = 'Bob'")
.select("id", "name", "age", "score")
- val expectedFilteredByName = spark.createDataFrame(Seq(
- (2, "Bob", 25, 87.3)
- )).toDF("id", "name", "age", "score")
+ val expectedFilteredByName = createDataFrame(Seq((2, "Bob", 25, 87.3)))
assertTrue(expectedFilteredByName.except(filteredByName).isEmpty)
assertTrue(filteredByName.except(expectedFilteredByName).isEmpty)
@@ -196,19 +173,20 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
.where("age >= 28 AND score > 90")
.select("id", "name", "age", "score")
- val expectedFilteredComplex = spark.createDataFrame(Seq(
+ val expectedFilteredComplex = createDataFrame(Seq(
(1, "Alice", 30, 95.5),
(3, "Charlie", 35, 92.1),
(5, "Eve", 32, 91.4)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedFilteredComplex.except(filteredComplex).isEmpty)
assertTrue(filteredComplex.except(expectedFilteredComplex).isEmpty)
}
- @Test
- def testMultipleBulkInsertsWithCommitValidation(): Unit = {
- val tableName = "test_lance_multiple_inserts"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testMultipleBulkInsertsWithCommitValidation(tableType: HoodieTableType):
Unit = {
+ val tableName =
s"test_lance_multiple_inserts_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// First insert - records 1-3
@@ -217,19 +195,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1)
)
- val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+ val df1 = createDataFrame(records1)
- df1.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df1, operation =
Some("bulk_insert"))
// Second insert - records 4-6
val records2 = Seq(
@@ -237,19 +205,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(5, "Eve", 32, 91.4),
(6, "Frank", 27, 85.7)
)
- val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+ val df2 = createDataFrame(records2)
- df2.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df2, operation =
Some("bulk_insert"))
// Third insert - records 7-9
val records3 = Seq(
@@ -257,19 +215,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(8, "Henry", 31, 89.6),
(9, "Iris", 26, 94.8)
)
- val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+ val df3 = createDataFrame(records3)
- df3.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df3, operation =
Some("bulk_insert"))
// Validate number of commits matches number of inserts
val metaClient = HoodieTableMetaClient.builder()
@@ -284,11 +232,6 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val commits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
assertEquals(3, commits.size, "Should have exactly 3 commits")
- // Check that each commit is a COMMIT action (bulk_insert creates COMMIT
actions)
- commits.foreach { instant =>
- assertEquals("commit", instant.getAction, s"Instant
${instant.requestedTime()} should be a commit action")
- }
-
// Read back all data and verify total record count
val readDf = spark.read
.format("hudi")
@@ -296,7 +239,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase
{
val actual = readDf.select("id", "name", "age", "score")
- val expectedDf = spark.createDataFrame(Seq(
+ val expectedDf = createDataFrame(Seq(
(1, "Alice", 30, 95.5),
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1),
@@ -306,15 +249,16 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(7, "Grace", 29, 93.2),
(8, "Henry", 31, 89.6),
(9, "Iris", 26, 94.8)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
}
- @Test
- def testTimeTravel(): Unit = {
- val tableName = "test_lance_time_travel"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testTimeTravel(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_time_travel_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// First insert - records 1-3
@@ -323,19 +267,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1)
)
- val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+ val df1 = createDataFrame(records1)
- df1.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite)
// Second insert - records 4-6
val records2 = Seq(
@@ -343,19 +277,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(5, "Eve", 32, 91.4),
(6, "Frank", 27, 85.7)
)
- val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+ val df2 = createDataFrame(records2)
- df2.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df2)
// Get the commit timestamp after second insert
val metaClient = HoodieTableMetaClient.builder()
@@ -373,19 +297,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(8, "Henry", 31, 89.6),
(9, "Iris", 26, 94.8)
)
- val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+ val df3 = createDataFrame(records3)
- df3.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df3)
// Time travel query to second commit (should see data from c1 + c2 only)
val timeTravelDf = spark.read
@@ -395,22 +309,23 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val actual = timeTravelDf.select("id", "name", "age", "score")
- val expectedDf = spark.createDataFrame(Seq(
+ val expectedDf = createDataFrame(Seq(
(1, "Alice", 30, 95.5),
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1),
(4, "David", 28, 88.9),
(5, "Eve", 32, 91.4),
(6, "Frank", 27, 85.7)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
}
- @Test
- def testMultipleRegularInsertsWithCommitValidation(): Unit = {
- val tableName = "test_lance_regular_inserts"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testMultipleRegularInsertsWithCommitValidation(tableType:
HoodieTableType): Unit = {
+ val tableName =
s"test_lance_regular_inserts_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// First insert - records 1-3 using regular insert
@@ -419,19 +334,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1)
)
- val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+ val df1 = createDataFrame(records1)
- df1.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "insert")
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite, operation = Some("insert"))
// Second insert - records 4-6 using regular insert
val records2 = Seq(
@@ -439,19 +344,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(5, "Eve", 32, 91.4),
(6, "Frank", 27, 85.7)
)
- val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+ val df2 = createDataFrame(records2)
- df2.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "insert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df2, operation =
Some("insert"))
// Validate number of commits matches number of inserts
val metaClient = HoodieTableMetaClient.builder()
@@ -466,9 +361,11 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val commits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala.toList
assertEquals(2, commits.size, "Should have exactly 2 commits")
- // Check that each commit is a COMMIT action (insert creates COMMIT
actions)
+ // Verify commit action types based on table type
+ val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE)
"commit" else "deltacommit"
commits.foreach { instant =>
- assertEquals("commit", instant.getAction, s"Instant
${instant.requestedTime()} should be a commit action")
+ assertEquals(expectedAction, instant.getAction,
+ s"Instant ${instant.requestedTime()} should be a $expectedAction
action for $tableType table")
}
// Read back all data and verify total record count
@@ -478,22 +375,23 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val actual = readDf.select("id", "name", "age", "score")
- val expectedDf = spark.createDataFrame(Seq(
+ val expectedDf = createDataFrame(Seq(
(1, "Alice", 30, 95.5),
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1),
(4, "David", 28, 88.9),
(5, "Eve", 32, 91.4),
(6, "Frank", 27, 85.7)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
}
- @Test
- def testBasicUpsertModifyExistingRow(): Unit = {
- val tableName = "test_lance_upsert"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBasicUpsertModifyExistingRow(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_upsert_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// Initial insert - 3 records
@@ -502,56 +400,26 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1)
)
- val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+ val df1 = createDataFrame(records1)
- df1.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "insert")
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite, operation = Some("insert"))
// Upsert - modify Bob's record (id=2)
val records2 = Seq(
(2, "Bob", 40, 95.0) // Update Bob: age 25->40, score 87.3->95.0
)
- val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+ val df2 = createDataFrame(records2)
- df2.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "upsert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df2, operation =
Some("upsert"))
// Second upsert - modify Alice (id=1) and insert David (id=4)
val records3 = Seq(
(1, "Alice", 45, 98.5), // Update Alice: age 30->45, score 95.5->98.5
(4, "David", 28, 88.0) // Insert new record
)
- val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+ val df3 = createDataFrame(records3)
- df3.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "upsert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df3, operation =
Some("upsert"))
// Validate commits
val metaClient = HoodieTableMetaClient.builder()
@@ -562,24 +430,59 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val commitCount =
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
assertEquals(3, commitCount, "Should have 3 completed commits (insert + 2
upserts)")
+ // Verify commit action types based on table type
+ val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE)
"commit" else "deltacommit"
+ val commits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala
+ commits.foreach { instant =>
+ assertEquals(expectedAction, instant.getAction,
+ s"Instant ${instant.requestedTime()} should be a $expectedAction
action for $tableType table")
+ }
+
// Read and verify data
val readDf = spark.read.format("hudi").load(tablePath)
val actual = readDf.select("id", "name", "age", "score")
- val expectedDf = spark.createDataFrame(Seq(
+ val expectedDf = createDataFrame(Seq(
(1, "Alice", 45, 98.5),
(2, "Bob", 40, 95.0),
(3, "Charlie", 35, 92.1),
(4, "David", 28, 88.0)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
+
+ if (tableType == HoodieTableType.MERGE_ON_READ) {
+ // Write one more commit to trigger compaction
+ val records4 = Seq(
+ (1, "Alice", 50, 98.5), // Update Alice: age 45->50
+ (4, "David", 28, 90.0) // Update David: score 88.0->90.0
+ )
+ val df4 = createDataFrame(records4)
+ writeDataframe(tableType, tableName, tablePath, df4, operation =
Some("upsert"),
+ extraOptions = Map("hoodie.compact.inline" -> "true",
"hoodie.compact.inline.max.delta.commits" -> "1"))
+ val expectedDfAfterCompaction = createDataFrame(Seq(
+ (1, "Alice", 50, 98.5),
+ (2, "Bob", 40, 95.0),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 90.0)
+ ))
+ // validate compaction commit is present
+ val compactionCommits =
metaClient.reloadActiveTimeline().filterCompletedInstants().getInstants.asScala
+ .filter(instant => instant.getAction == "commit")
+ assertTrue(compactionCommits.nonEmpty, "Compaction commit should be
present after upsert")
+ // Read and verify data after compaction
+ val readDfAfterCompaction = spark.read.format("hudi").load(tablePath)
+ val actualAfterCompaction = readDfAfterCompaction.select("id", "name",
"age", "score")
+
assertTrue(expectedDfAfterCompaction.except(actualAfterCompaction).isEmpty)
+
assertTrue(actualAfterCompaction.except(expectedDfAfterCompaction).isEmpty)
+ }
}
- @Test
- def testBasicDeleteOperation(): Unit = {
- val tableName = "test_lance_delete"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testBasicDeleteOperation(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_delete_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// Initial insert - 5 records
@@ -590,19 +493,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(4, "David", 28, 88.0),
(5, "Eve", 32, 91.4)
)
- val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+ val df1 = createDataFrame(records1)
- df1.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "insert")
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite, operation = Some("insert"))
// Delete operation - delete Bob (id=2), David (id=4), and a non-existent
key (id=99)
val recordsToDelete = Seq(
@@ -610,19 +503,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(4, "David", 28, 88.0), // Delete David (exists)
(99, "NonExistent", 50, 0.0) // Delete non-existent record (should be
no-op)
)
- val deleteDF = spark.createDataFrame(recordsToDelete).toDF("id", "name",
"age", "score")
+ val deleteDF = createDataFrame(recordsToDelete)
- deleteDF.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "delete")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, deleteDF, operation =
Some("delete"))
// Validate commits
val metaClient = HoodieTableMetaClient.builder()
@@ -633,23 +516,32 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val commitCount =
metaClient.getCommitsTimeline.filterCompletedInstants().countInstants()
assertEquals(2, commitCount, "Should have 2 completed commits (insert +
delete)")
+ // Verify commit action types based on table type
+ val expectedAction = if (tableType == HoodieTableType.COPY_ON_WRITE)
"commit" else "deltacommit"
+ val commits =
metaClient.getCommitsTimeline.filterCompletedInstants().getInstants.asScala
+ commits.foreach { instant =>
+ assertEquals(expectedAction, instant.getAction,
+ s"Instant ${instant.requestedTime()} should be a $expectedAction
action for $tableType table")
+ }
+
// Read and verify data
val readDf = spark.read.format("hudi").load(tablePath)
val actual = readDf.select("id", "name", "age", "score")
- val expectedDf = spark.createDataFrame(Seq(
+ val expectedDf = createDataFrame(Seq(
(1, "Alice", 30, 95.5),
(3, "Charlie", 35, 92.1),
(5, "Eve", 32, 91.4)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
}
- @Test
- def testIncrementalQuery(): Unit = {
- val tableName = "test_lance_incremental"
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testIncrementalQuery(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_incremental_${tableType.name().toLowerCase}"
val tablePath = s"$basePath/$tableName"
// First insert - records 1-3
@@ -658,19 +550,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(2, "Bob", 25, 87.3),
(3, "Charlie", 35, 92.1)
)
- val df1 = spark.createDataFrame(records1).toDF("id", "name", "age",
"score")
+ val df1 = createDataFrame(records1)
- df1.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Overwrite)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite)
// Second insert - records 4-6
val records2 = Seq(
@@ -678,19 +560,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(5, "Eve", 32, 91.4),
(6, "Frank", 27, 85.7)
)
- val df2 = spark.createDataFrame(records2).toDF("id", "name", "age",
"score")
+ val df2 = createDataFrame(records2)
- df2.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df2)
// Get commit timestamps
val metaClient = HoodieTableMetaClient.builder()
@@ -708,19 +580,9 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
(8, "Henry", 31, 89.6),
(9, "Iris", 26, 94.8)
)
- val df3 = spark.createDataFrame(records3).toDF("id", "name", "age",
"score")
+ val df3 = createDataFrame(records3)
- df3.write
- .format("hudi")
- .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
- .option(RECORDKEY_FIELD.key(), "id")
- .option(PRECOMBINE_FIELD.key(), "age")
- .option(TABLE_NAME.key(), tableName)
- .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
- .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
- .option(OPERATION.key(), "bulk_insert")
- .mode(SaveMode.Append)
- .save(tablePath)
+ writeDataframe(tableType, tableName, tablePath, df3)
// Reload metaClient to get latest commits
metaClient.reloadActiveTimeline()
@@ -738,13 +600,91 @@ class TestLanceDataSource extends
HoodieSparkClientTestBase {
val actual = incrementalDf.select("id", "name", "age", "score")
- val expectedDf = spark.createDataFrame(Seq(
+ val expectedDf = createDataFrame(Seq(
(7, "Grace", 29, 93.2),
(8, "Henry", 31, 89.6),
(9, "Iris", 26, 94.8)
- )).toDF("id", "name", "age", "score")
+ ))
assertTrue(expectedDf.except(actual).isEmpty)
assertTrue(actual.except(expectedDf).isEmpty)
}
+
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
+ def testClustering(tableType: HoodieTableType): Unit = {
+ val tableName = s"test_lance_clustering_${tableType.name().toLowerCase}"
+ val tablePath = s"$basePath/$tableName"
+
+ // Initial insert - 5 records
+ val records1 = Seq(
+ (1, "Alice", 30, 95.5),
+ (2, "Bob", 25, 87.3),
+ (3, "Charlie", 35, 92.1),
+ (4, "David", 28, 88.0),
+ (5, "Eve", 32, 91.4)
+ )
+ val df1 = createDataFrame(records1)
+
+ writeDataframe(tableType, tableName, tablePath, df1, saveMode =
SaveMode.Overwrite, operation = Some("bulk_insert"))
+
+ // Second insert - 5 more records
+ val records2 = Seq(
+ (6, "Frank", 27, 85.7),
+ (7, "Grace", 29, 93.2),
+ (8, "Henry", 31, 89.6),
+ (9, "Iris", 26, 94.8),
+ (10, "Jack", 33, 90.5)
+ )
+ val df2 = createDataFrame(records2)
+ writeDataframe(tableType, tableName, tablePath, df2, operation =
Some("bulk_insert"), extraOptions = Map(
+ "hoodie.clustering.inline" -> "true",
+ "hoodie.clustering.inline.max.commits" -> "1"
+ ))
+
+ // Validate that clustering commit is present
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .setBasePath(tablePath)
+ .build()
+
assertTrue(metaClient.getActiveTimeline.getLastClusteringInstant.isPresent,
"Clustering commit should be present after inline clustering")
+
+ // Read and verify data
+ val readDf = spark.read.format("hudi").load(tablePath)
+ val actual = readDf.select("id", "name", "age", "score")
+
+ val expectedDf = createDataFrame(records1 ++ records2)
+
+ assertTrue(expectedDf.except(actual).isEmpty)
+ assertTrue(actual.except(expectedDf).isEmpty)
+ }
+
+ private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = {
+ spark.createDataFrame(records).toDF("id", "name", "age",
"score").coalesce(1)
+ }
+
+ private def writeDataframe(tableType: HoodieTableType, tableName: String,
tablePath: String, df: DataFrame,
+ saveMode: SaveMode = SaveMode.Append, operation:
Option[String] = None,
+ extraOptions: Map[String, String] = Map.empty):
Unit = {
+ var writer = df.write
+ .format("hudi")
+ .option(HoodieTableConfig.BASE_FILE_FORMAT.key(), "LANCE")
+ .option(TABLE_TYPE.key(), tableType.name())
+ .option(RECORDKEY_FIELD.key(), "id")
+ .option(PRECOMBINE_FIELD.key(), "age")
+ .option(TABLE_NAME.key(), tableName)
+ .option(HoodieWriteConfig.TBL_NAME.key(), tableName)
+ .option(HoodieWriteConfig.RECORD_MERGE_IMPL_CLASSES.key(),
classOf[DefaultSparkRecordMerger].getName)
+
+ // Add operation if specified
+ writer = operation match {
+ case Some(op) => writer.option(OPERATION.key(), op)
+ case None => writer
+ }
+
+ // Add any extra options
+ extraOptions.foreach { case (key, value) => writer = writer.option(key,
value) }
+
+ writer.mode(saveMode).save(tablePath)
+ }
}