This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 71ceb959218c [HUDI-9794] Convert the avro into file bytes in
LegacyArchivedMetaEntryReader to keep forward compatibility (#13862)
71ceb959218c is described below
commit 71ceb959218c08944ebf44cd735ff698562945b6
Author: Vamshi Krishna Kyatham
<[email protected]>
AuthorDate: Tue Sep 9 00:02:56 2025 -0700
[HUDI-9794] Convert the avro into file bytes in
LegacyArchivedMetaEntryReader to keep forward compatibility (#13862)
---
.../utils/LegacyArchivedMetaEntryReader.java | 2 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 14 +++
.../table/timeline/MetadataConversionUtils.java | 4 +-
.../hudi/functional/TestSevenToEightUpgrade.scala | 101 ++++++++++++++++++++-
4 files changed, 118 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index c75561b61d02..94a676ac5d0f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -100,7 +100,7 @@ public class LegacyArchivedMetaEntryReader {
Object actionData = record.get(key);
if (actionData != null) {
if (actionData instanceof IndexedRecord) {
- return HoodieAvroUtils.avroToBytes((IndexedRecord) actionData);
+ return HoodieAvroUtils.avroToFileBytes((IndexedRecord) actionData);
} else {
// should be json bytes.
try {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ff35d70158c7..8658d8f3960b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -40,6 +40,7 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.LogicalTypes.Decimal;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
@@ -161,6 +162,19 @@ public class HoodieAvroUtils {
}
}
+ public static byte[] avroToFileBytes(IndexedRecord record) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ try (DataFileWriter<IndexedRecord> writer = new DataFileWriter<>(new
GenericDatumWriter<>(record.getSchema()))) {
+ writer.create(record.getSchema(), baos);
+ writer.append(record);
+ writer.flush();
+ return baos.toByteArray();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to convert IndexedRecord to Avro
file format", e);
+ }
+ }
+
/**
* Convert a given avro record to json and return the string
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
index 06be0391f2cd..e28af75a1ef7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
@@ -183,7 +183,9 @@ public class MetadataConversionUtils {
switch (actionType) {
case HoodieTimeline.CLEAN_ACTION: {
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
new ByteArrayInputStream(instantDetails.get())));
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
new ByteArrayInputStream(planBytes.get())));
+ if (planBytes.isPresent()) {
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
new ByteArrayInputStream(planBytes.get())));
+ }
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index 55f0a68bf2d6..2bdfaff7f880 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -22,11 +22,13 @@ import org.apache.hudi.DataSourceUtils
import org.apache.hudi.DataSourceWriteOptions.{INSERT_OPERATION_OPT_VAL,
KEYGENERATOR_CLASS_NAME, OPERATION, ORDERING_FIELDS, PARTITIONPATH_FIELD,
PAYLOAD_CLASS_NAME, RECORD_MERGE_MODE, RECORDKEY_FIELD, TABLE_TYPE,
UPSERT_OPERATION_OPT_VAL}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode,
TypedProperties}
+import
org.apache.hudi.common.config.LockConfiguration.{LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY}
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, HoodieRecordPayload, HoodieTableType,
OverwriteWithLatestAvroPayload, TableServiceType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
import
org.apache.hudi.common.table.timeline.InstantComparison.{compareTimestamps,
GREATER_THAN_OR_EQUALS}
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
import org.apache.hudi.common.util.{Option, StringUtils}
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig,
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
@@ -285,6 +287,103 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
assertEquals(1, df2.filter("price = 35").count())
}
+ @Test
+ def testV6TableUpgradeToV9ToV6(): Unit = {
+ val partitionFields = "partition:simple"
+
+ val hudiOptsV6 = commonOpts ++ Map(
+ TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
+ KEYGENERATOR_CLASS_NAME.key -> KeyGeneratorType.CUSTOM.getClassName,
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ "hoodie.metadata.enable" -> "true",
+ PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName,
+ RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name,
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key -> "6",
+ HoodieWriteConfig.TIMELINE_LAYOUT_VERSION_NUM.key() ->
Integer.toString(TimelineLayoutVersion.VERSION_1),
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "false",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key -> "3",
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key -> "4",
+ HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key -> "5",
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key ->
"org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+ LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY -> "60000",
+ LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY -> "10",
+ LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY -> "1000",
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key ->
"OPTIMISTIC_CONCURRENCY_CONTROL"
+ )
+
+ doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
+ operation = INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ validate = false)
+
+ for (i <- 1 to 10) {
+ doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
+ operation = INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
+ }
+ metaClient = getLatestMetaClient(true)
+
+ assertEquals(HoodieTableVersion.SIX,
metaClient.getTableConfig.getTableVersion)
+ assertEquals("partition",
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ val archivePath = new
org.apache.hudi.storage.StoragePath(metaClient.getArchivePath,
".commits_.archive*")
+ val archivedFiles = metaClient.getStorage.globEntries(archivePath)
+ println(s"Archived files found ${archivedFiles.size()}")
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storage.getConf())
+ .build()
+
+ val hudiOptsUpgrade = hudiOptsV6 ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.current().versionCode().toString,
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key ->
"org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key ->
"OPTIMISTIC_CONCURRENCY_CONTROL"
+ ) - HoodieWriteConfig.AUTO_UPGRADE_VERSION.key
+
+ doWriteAndValidateDataAndRecordIndex(hudiOptsUpgrade,
+ operation = UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storage.getConf())
+ .build()
+
+ assertEquals(HoodieTableVersion.current(),
metaClient.getTableConfig.getTableVersion)
+ assertEquals(partitionFields,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ val archivedFilesAfterUpgrade =
metaClient.getStorage.globEntries(archivePath)
+
+ assertTrue(archivedFilesAfterUpgrade.size() > 0,
+ "Even after upgrade, fresh table with ~12 commits should have archived
files")
+
+ val hudiOptsDowngrade = hudiOptsV6 ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.SIX.versionCode().toString,
+ HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key ->
"org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+ HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key ->
"OPTIMISTIC_CONCURRENCY_CONTROL"
+ )
+
+ new UpgradeDowngrade(metaClient, getWriteConfig(hudiOptsDowngrade,
basePath), context, SparkUpgradeDowngradeHelper.getInstance)
+ .run(HoodieTableVersion.SIX, null)
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storage.getConf())
+ .build()
+
+ assertEquals(HoodieTableVersion.SIX,
metaClient.getTableConfig.getTableVersion)
+ assertEquals("partition",
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ val v6ArchivePath = new
org.apache.hudi.storage.StoragePath(metaClient.getArchivePath,
".commits_.archive*")
+ val v6ArchivedFiles = metaClient.getStorage.globEntries(v6ArchivePath)
+
+ assertTrue(v6ArchivedFiles.size() > 0, "Downgrade should have archived
files in V6 format")
+ }
+
private def getWriteConfig(hudiOpts: Map[String, String], basePath: String):
HoodieWriteConfig = {
val props = TypedProperties.fromMap(hudiOpts.asJava)
HoodieWriteConfig.newBuilder()