This is an automated email from the ASF dual-hosted git repository.
voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 34e9c7c5bbdc test(schema): Add MOR log-only compaction tests for
custom types (#18583)
34e9c7c5bbdc is described below
commit 34e9c7c5bbdc30143a3f2dbb6931149f6350357f
Author: voonhous <[email protected]>
AuthorDate: Thu May 7 19:05:05 2026 +0800
test(schema): Add MOR log-only compaction tests for custom types (#18583)
* test(schema): Add MOR log-only compaction tests for custom types
Cover the invariant that the HoodieSchema.TYPE_METADATA_FIELD descriptor
and payload shape of a custom-typed column survive inline compaction of
a log-only MOR table into a base file.
- TestVectorDataSource: add testMorLogOnlyCompactionPreservesVectorMetadata
(5 commits via SQL + MERGE INTO to trigger default inline compaction).
- TestVariantDataType: equivalent VARIANT test, gated on Spark 4.0+,
asserting native VariantType round-trips through compaction.
- TestBlobDataType (new): BLOB INLINE and BLOB OUT_OF_LINE cases. Inline
uses named_struct with hex byte literals; out-of-line creates real files
via BlobTestHelpers.createTestFile and verifies bytes via read_blob().
* test(schema): Address review comments on MOR log-only compaction tests
- Pin hoodie.compact.inline.max.delta.commits = '5' on all 4 tables so
compaction triggers deterministically rather than via the implicit
default
- Rename path to externalPath in outOfLineBlobLiteral
- Fail with the missing id in embeddingOf instead of a bare .get
- Extract val tablePath in the variant test for consistency
---
.../hudi/functional/TestVectorDataSource.scala | 129 +++++++++-
.../sql/hudi/dml/schema/TestBlobDataType.scala | 285 +++++++++++++++++++++
.../sql/hudi/dml/schema/TestVariantDataType.scala | 117 +++++++++
3 files changed, 530 insertions(+), 1 deletion(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
index 81eb776b5482..daf0c8a6c19b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestVectorDataSource.scala
@@ -19,7 +19,8 @@ package org.apache.hudi.functional
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
-import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.testutils.{DataSourceTestUtils,
HoodieSparkClientTestBase}
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
@@ -674,6 +675,132 @@ class TestVectorDataSource extends
HoodieSparkClientTestBase {
assertTrue(r7.getSeq[Double](1).forall(_ == 1.0), "key_7 should have
original value 1.0")
}
+ @Test
+ def testMorLogOnlyCompactionPreservesVectorMetadata(): Unit = {
+ val path = basePath + "/mor_log_only_vec"
+ val tableName = "mor_log_only_vec_test"
+ try {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | embedding VECTOR(3),
+ | ts long
+ |) using hudi
+ | location '$path'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'INMEMORY',
+ | hoodie.compact.inline = 'true',
+ | hoodie.compact.inline.max.delta.commits = '5',
+ | hoodie.clean.commits.retained = '1'
+ | )
+ """.stripMargin)
+
+ def readOrdered(): Seq[Row] =
+ spark.sql(s"select id, embedding, ts from $tableName order by
id").collect().toSeq
+
+ def embeddingOf(id: Int, rows: Seq[Row]): Seq[Float] =
+ rows.find(_.getInt(0) == id)
+ .getOrElse(fail(s"No row with id=$id"))
+ .getSeq[Float](1)
+
+ spark.sql(
+ s"insert into $tableName values " +
+ "(1, array(cast(0.1 as float), cast(0.2 as float), cast(0.3 as
float)), 1000)")
+ spark.sql(
+ s"insert into $tableName values " +
+ "(2, array(cast(0.4 as float), cast(0.5 as float), cast(0.6 as
float)), 1000)")
+ spark.sql(
+ s"insert into $tableName values " +
+ "(3, array(cast(0.7 as float), cast(0.8 as float), cast(0.9 as
float)), 1000)")
+ // 3 commits will not trigger compaction, so it should be log only.
+ assertTrue(DataSourceTestUtils.isLogFileOnly(path))
+ val afterInserts = readOrdered()
+ assertEquals(3, afterInserts.size)
+ assertEquals(Seq(0.1f, 0.2f, 0.3f), embeddingOf(1, afterInserts))
+ assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterInserts))
+ assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterInserts))
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 1 as id,
+ | array(cast(0.11 as float), cast(0.22 as float), cast(0.33
as float)) as embedding,
+ | 1001L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ // 4 commits will not trigger compaction, so it should be log only.
+ assertTrue(DataSourceTestUtils.isLogFileOnly(path))
+ val afterUpdate = readOrdered()
+ assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterUpdate))
+ assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterUpdate))
+ assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterUpdate))
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 4 as id,
+ | array(cast(0.44 as float), cast(0.55 as float), cast(0.66
as float)) as embedding,
+ | 1000L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when not matched then insert *
+ |""".stripMargin)
+
+ // 5 commits will trigger compaction.
+ assertFalse(DataSourceTestUtils.isLogFileOnly(path))
+ val afterCompaction = readOrdered()
+ assertEquals(4, afterCompaction.size)
+ assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCompaction))
+ assertEquals(Seq(0.4f, 0.5f, 0.6f), embeddingOf(2, afterCompaction))
+ assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCompaction))
+ assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCompaction))
+
+ // VECTOR custom-type descriptor must survive the compacted base-file
read path.
+ val embeddingField = spark.table(tableName).schema.find(_.name ==
"embedding").get
+
assertTrue(embeddingField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
+ s"Expected VECTOR type metadata on embedding field after compaction, "
+
+ s"got: ${embeddingField.metadata}")
+
+ // 6th commit drives an auto-clean that retires the now-superseded
log-only slice.
+ // Inline compaction on commit 5 ran AFTER its own postCommit clean, so
the prior
+ // slice was not yet superseded when that clean fired and no .clean
instant was
+ // written. This deltacommit's postCommit clean sees the post-compaction
base
+ // file and writes the .clean instant.
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 2 as id,
+ | array(cast(0.222 as float), cast(0.555 as float),
cast(0.888 as float)) as embedding,
+ | 1002L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ val afterCleanup = readOrdered()
+ assertEquals(Seq(0.11f, 0.22f, 0.33f), embeddingOf(1, afterCleanup))
+ assertEquals(Seq(0.222f, 0.555f, 0.888f), embeddingOf(2, afterCleanup))
+ assertEquals(Seq(0.7f, 0.8f, 0.9f), embeddingOf(3, afterCleanup))
+ assertEquals(Seq(0.44f, 0.55f, 0.66f), embeddingOf(4, afterCleanup))
+
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(path).setConf(storageConf).build()
+ metaClient.reloadActiveTimeline()
+
assertTrue(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() > 0,
+ "Expected at least one .clean instant on the timeline after
compaction")
+ } finally {
+ spark.sql(s"drop table if exists $tableName")
+ }
+ }
+
@Test
def testDimensionMismatchOnWrite(): Unit = {
// Schema declares VECTOR(8) but data has arrays of length 4
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala
new file mode 100644
index 000000000000..a1197fd7a89b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestBlobDataType.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.hudi.dml.schema
+
+import org.apache.hudi.blob.BlobTestHelpers
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType}
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+
+import java.io.File
+
+class TestBlobDataType extends HoodieSparkSqlTestBase {
+
+ private val referenceStructType =
+ "struct<external_path:string, offset:bigint, length:bigint,
managed:boolean>"
+
+ private def inlineBlobLiteral(hex: String): String =
+ s"""named_struct(
+ | 'type', 'INLINE',
+ | 'data', cast(X'$hex' as binary),
+ | 'reference', cast(null as $referenceStructType)
+ |)""".stripMargin
+
+ private def outOfLineBlobLiteral(externalPath: String, offset: Long, length:
Long): String =
+ s"""named_struct(
+ | 'type', 'OUT_OF_LINE',
+ | 'data', cast(null as binary),
+ | 'reference', named_struct(
+ | 'external_path', '$externalPath',
+ | 'offset', cast($offset as bigint),
+ | 'length', cast($length as bigint),
+ | 'managed', false
+ | )
+ |)""".stripMargin
+
+ test("Test Query Log Only MOR Table With BLOB INLINE column triggers
compaction") {
+ withRecordType()(withTempDir { tmp =>
+ val tablePath = new File(tmp, "hudi").getCanonicalPath
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | data blob,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'INMEMORY',
+ | hoodie.compact.inline = 'true',
+ | hoodie.compact.inline.max.delta.commits = '5',
+ | hoodie.clean.commits.retained = '1'
+ | )
+ """.stripMargin)
+
+ spark.sql(s"insert into $tableName values (1,
${inlineBlobLiteral("01")}, 1000)")
+ spark.sql(s"insert into $tableName values (2,
${inlineBlobLiteral("02")}, 1000)")
+ spark.sql(s"insert into $tableName values (3,
${inlineBlobLiteral("03")}, 1000)")
+ // 3 commits will not trigger compaction, so it should be log only.
+ assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 1 as id, ${inlineBlobLiteral("11")} as data, 1001L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ // 4 commits will not trigger compaction, so it should be log only.
+ assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 4 as id, ${inlineBlobLiteral("04")} as data, 1000L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when not matched then insert *
+ |""".stripMargin)
+
+ // 5 commits will trigger compaction.
+ assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+ // read_blob() on an INLINE column returns the inline bytes directly,
verify the
+ // post-compaction bytes match what was written.
+ val bytesById = spark.sql(
+ s"select id, read_blob(data) as bytes from $tableName order by id"
+ ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+ assertResult(4)(bytesById.size)
+ assert(bytesById(1).sameElements(Array(0x11.toByte)))
+ assert(bytesById(2).sameElements(Array(0x02.toByte)))
+ assert(bytesById(3).sameElements(Array(0x03.toByte)))
+ assert(bytesById(4).sameElements(Array(0x04.toByte)))
+
+ // Verify inline shape: type='INLINE', data non-null, reference null.
+ spark.sql(s"select id, data from $tableName order by
id").collect().foreach { row =>
+ val blob = row.getStruct(1)
+
assertResult("INLINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE)))
+
assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)))
+
assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)))
+ }
+
+ // BLOB custom-type descriptor must survive the compacted base-file read
path.
+ val blobField = spark.table(tableName).schema.find(_.name == "data").get
+ assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
+ s"Expected BLOB type metadata on data field after compaction, " +
+ s"got: ${blobField.metadata}")
+ assertResult(HoodieSchemaType.BLOB.name())(
+ blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+
+ // 6th commit drives an auto-clean that retires the now-superseded
log-only slice.
+ // Inline compaction on commit 5 ran AFTER its own postCommit clean, so
the prior
+ // slice was not yet superseded when that clean fired and no .clean
instant was
+ // written. This deltacommit's postCommit clean writes the .clean
instant.
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 2 as id, ${inlineBlobLiteral("22")} as data, 1002L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ val updatedBytesById = spark.sql(
+ s"select id, read_blob(data) as bytes from $tableName order by id"
+ ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+ assert(updatedBytesById(2).sameElements(Array(0x22.toByte)))
+
+ val metaClient = createMetaClient(spark, tablePath)
+ metaClient.reloadActiveTimeline()
+ assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() >
0,
+ "Expected at least one .clean instant on the timeline after
compaction")
+ })
+ }
+
+ test("Test Query Log Only MOR Table With BLOB OUT_OF_LINE column triggers
compaction") {
+ withRecordType()(withTempDir { tmp =>
+ val tablePath = new File(tmp, "hudi").getCanonicalPath
+ val blobDir = new File(tmp, "blobs")
+ blobDir.mkdirs()
+ // createTestFile writes bytes where byte[i] = i % 256,
assertBytesContent
+ // checks round-trip against that pattern.
+ val file1 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob1.bin",
100)
+ val file2 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob2.bin",
100)
+ val file3 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob3.bin",
100)
+ val file4 = BlobTestHelpers.createTestFile(blobDir.toPath, "blob4.bin",
100)
+ val file1Updated = BlobTestHelpers.createTestFile(blobDir.toPath,
"blob1_updated.bin", 80)
+ val file2Updated = BlobTestHelpers.createTestFile(blobDir.toPath,
"blob2_updated.bin", 60)
+
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | data blob,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'INMEMORY',
+ | hoodie.compact.inline = 'true',
+ | hoodie.compact.inline.max.delta.commits = '5',
+ | hoodie.clean.commits.retained = '1'
+ | )
+ """.stripMargin)
+
+ spark.sql(
+ s"insert into $tableName values (1, ${outOfLineBlobLiteral(file1, 0L,
100L)}, 1000)")
+ spark.sql(
+ s"insert into $tableName values (2, ${outOfLineBlobLiteral(file2, 0L,
100L)}, 1000)")
+ spark.sql(
+ s"insert into $tableName values (3, ${outOfLineBlobLiteral(file3, 0L,
100L)}, 1000)")
+ // 3 commits will not trigger compaction, so it should be log only.
+ assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 1 as id, ${outOfLineBlobLiteral(file1Updated, 0L, 80L)}
as data, 1001L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ // 4 commits will not trigger compaction, so it should be log only.
+ assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 4 as id, ${outOfLineBlobLiteral(file4, 0L, 100L)} as
data, 1000L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when not matched then insert *
+ |""".stripMargin)
+
+ // 5 commits will trigger compaction.
+ assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath))
+
+ // read_blob() on an OUT_OF_LINE column must dereference external_path
and read
+ // the referenced byte range, verify bytes from the compacted base-file
plan.
+ val bytesById = spark.sql(
+ s"select id, read_blob(data) as bytes from $tableName order by id"
+ ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+ assertResult(4)(bytesById.size)
+ assertResult(80)(bytesById(1).length)
+ BlobTestHelpers.assertBytesContent(bytesById(1))
+ assertResult(100)(bytesById(2).length)
+ BlobTestHelpers.assertBytesContent(bytesById(2))
+ assertResult(100)(bytesById(3).length)
+ BlobTestHelpers.assertBytesContent(bytesById(3))
+ assertResult(100)(bytesById(4).length)
+ BlobTestHelpers.assertBytesContent(bytesById(4))
+
+ // Verify out-of-line shape: type='OUT_OF_LINE', data null, reference
non-null.
+ spark.sql(s"select id, data from $tableName order by
id").collect().foreach { row =>
+ val blob = row.getStruct(1)
+
assertResult("OUT_OF_LINE")(blob.getString(blob.fieldIndex(HoodieSchema.Blob.TYPE)))
+
assert(blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.INLINE_DATA_FIELD)))
+
assert(!blob.isNullAt(blob.fieldIndex(HoodieSchema.Blob.EXTERNAL_REFERENCE)))
+ }
+
+ // BLOB custom-type descriptor must survive the compacted base-file read
path.
+ val blobField = spark.table(tableName).schema.find(_.name == "data").get
+ assert(blobField.metadata.contains(HoodieSchema.TYPE_METADATA_FIELD),
+ s"Expected BLOB type metadata on data field after compaction, " +
+ s"got: ${blobField.metadata}")
+ assertResult(HoodieSchemaType.BLOB.name())(
+ blobField.metadata.getString(HoodieSchema.TYPE_METADATA_FIELD))
+
+ // 6th commit drives an auto-clean that retires the now-superseded
log-only slice.
+ // Inline compaction on commit 5 ran AFTER its own postCommit clean, so
the prior
+ // slice was not yet superseded when that clean fired and no .clean
instant was
+ // written. This deltacommit's postCommit clean writes the .clean
instant.
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 2 as id, ${outOfLineBlobLiteral(file2Updated, 0L, 60L)}
as data, 1002L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ val updatedBytesById = spark.sql(
+ s"select id, read_blob(data) as bytes from $tableName order by id"
+ ).collect().map(r => r.getInt(0) -> r.getAs[Array[Byte]]("bytes")).toMap
+ assertResult(60)(updatedBytesById(2).length)
+ BlobTestHelpers.assertBytesContent(updatedBytesById(2))
+
+ val metaClient = createMetaClient(spark, tablePath)
+ metaClient.reloadActiveTimeline()
+ assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() >
0,
+ "Expected at least one .clean instant on the timeline after
compaction")
+ })
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index 443a316abc8c..56bb3270b5a1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -24,6 +24,8 @@ import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.internal.schema.HoodieSchemaException
+import org.apache.hudi.testutils.DataSourceTestUtils
+import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -128,6 +130,121 @@ class TestVariantDataType extends HoodieSparkSqlTestBase {
}
}
+ test("Test Query Log Only MOR Table With VARIANT column triggers
compaction") {
+ assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or
higher")
+
+ withRecordType()(withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | v variant,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'INMEMORY',
+ | hoodie.compact.inline = 'true',
+ | hoodie.compact.inline.max.delta.commits = '5',
+ | hoodie.clean.commits.retained = '1'
+ | )
+ """.stripMargin)
+
+ spark.sql(
+ s"insert into $tableName values " +
+ "(1, parse_json('{\"key\":\"value1\"}'), 1000)")
+ spark.sql(
+ s"insert into $tableName values " +
+ "(2, parse_json('{\"key\":\"value2\"}'), 1000)")
+ spark.sql(
+ s"insert into $tableName values " +
+ "(3, parse_json('{\"key\":\"value3\"}'), 1000)")
+ // 3 commits will not trigger compaction, so it should be log only.
+ assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+ checkAnswer(s"select id, cast(v as string), ts from $tableName order by
id")(
+ Seq(1, "{\"key\":\"value1\"}", 1000),
+ Seq(2, "{\"key\":\"value2\"}", 1000),
+ Seq(3, "{\"key\":\"value3\"}", 1000)
+ )
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 1 as id,
+ | parse_json('{"key":"v1-merged"}') as v,
+ | 1001L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ // 4 commits will not trigger compaction, so it should be log only.
+ assertResult(true)(DataSourceTestUtils.isLogFileOnly(tablePath))
+ checkAnswer(s"select id, cast(v as string), ts from $tableName order by
id")(
+ Seq(1, "{\"key\":\"v1-merged\"}", 1001),
+ Seq(2, "{\"key\":\"value2\"}", 1000),
+ Seq(3, "{\"key\":\"value3\"}", 1000)
+ )
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 4 as id,
+ | parse_json('{"key":"value4"}') as v,
+ | 1000L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when not matched then insert *
+ |""".stripMargin)
+
+ // 5 commits will trigger compaction.
+ assertResult(false)(DataSourceTestUtils.isLogFileOnly(tablePath))
+ checkAnswer(s"select id, cast(v as string), ts from $tableName order by
id")(
+ Seq(1, "{\"key\":\"v1-merged\"}", 1001),
+ Seq(2, "{\"key\":\"value2\"}", 1000),
+ Seq(3, "{\"key\":\"value3\"}", 1000),
+ Seq(4, "{\"key\":\"value4\"}", 1000)
+ )
+
+ // VARIANT must round-trip as native VariantType through the compacted
base-file read path.
+ val variantField = spark.table(tableName).schema.find(_.name == "v").get
+ assertResult("variant")(variantField.dataType.typeName)
+
+ // 6th commit drives an auto-clean that retires the now-superseded
log-only slice.
+ // Inline compaction on commit 5 ran AFTER its own postCommit clean, so
the prior
+ // slice was not yet superseded when that clean fired and no .clean
instant was
+ // written. This deltacommit's postCommit clean writes the .clean
instant.
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 2 as id,
+ | parse_json('{"key":"v2-merged"}') as v,
+ | 1002L as ts
+ |) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+ checkAnswer(s"select id, cast(v as string), ts from $tableName order by
id")(
+ Seq(1, "{\"key\":\"v1-merged\"}", 1001),
+ Seq(2, "{\"key\":\"v2-merged\"}", 1002),
+ Seq(3, "{\"key\":\"value3\"}", 1000),
+ Seq(4, "{\"key\":\"value4\"}", 1000)
+ )
+
+ val metaClient = createMetaClient(spark, tablePath)
+ metaClient.reloadActiveTimeline()
+ assert(metaClient.getActiveTimeline.getCleanerTimeline.countInstants() >
0,
+ "Expected at least one .clean instant on the timeline after
compaction")
+ })
+ }
+
test("Test toHiveCompatibleSchema converts VariantType to physical struct") {
assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or
higher")