This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 71ceb959218c [HUDI-9794] Convert the avro into file bytes in 
LegacyArchivedMetaEntryReader to keep forward compatibility (#13862)
71ceb959218c is described below

commit 71ceb959218c08944ebf44cd735ff698562945b6
Author: Vamshi Krishna Kyatham 
<[email protected]>
AuthorDate: Tue Sep 9 00:02:56 2025 -0700

    [HUDI-9794] Convert the avro into file bytes in 
LegacyArchivedMetaEntryReader to keep forward compatibility (#13862)
---
 .../utils/LegacyArchivedMetaEntryReader.java       |   2 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  14 +++
 .../table/timeline/MetadataConversionUtils.java    |   4 +-
 .../hudi/functional/TestSevenToEightUpgrade.scala  | 101 ++++++++++++++++++++-
 4 files changed, 118 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index c75561b61d02..94a676ac5d0f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -100,7 +100,7 @@ public class LegacyArchivedMetaEntryReader {
       Object actionData = record.get(key);
       if (actionData != null) {
         if (actionData instanceof IndexedRecord) {
-          return HoodieAvroUtils.avroToBytes((IndexedRecord) actionData);
+          return HoodieAvroUtils.avroToFileBytes((IndexedRecord) actionData);
         } else {
           // should be json bytes.
           try {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ff35d70158c7..8658d8f3960b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -40,6 +40,7 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.LogicalTypes.Decimal;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
@@ -161,6 +162,19 @@ public class HoodieAvroUtils {
     }
   }
 
+  public static byte[] avroToFileBytes(IndexedRecord record) {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      try (DataFileWriter<IndexedRecord> writer = new DataFileWriter<>(new 
GenericDatumWriter<>(record.getSchema()))) {
+        writer.create(record.getSchema(), baos);
+        writer.append(record);
+        writer.flush();
+        return baos.toByteArray();
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to convert IndexedRecord to Avro 
file format", e);
+    }
+  }
+
   /**
    * Convert a given avro record to json and return the string
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
index 06be0391f2cd..e28af75a1ef7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
@@ -183,7 +183,9 @@ public class MetadataConversionUtils {
     switch (actionType) {
       case HoodieTimeline.CLEAN_ACTION: {
         
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
 new ByteArrayInputStream(instantDetails.get())));
-        
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 new ByteArrayInputStream(planBytes.get())));
+        if (planBytes.isPresent()) {
+          
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
 new ByteArrayInputStream(planBytes.get())));
+        }
         archivedMetaWrapper.setActionType(ActionType.clean.name());
         break;
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index 55f0a68bf2d6..2bdfaff7f880 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -22,11 +22,13 @@ import org.apache.hudi.DataSourceUtils
 import org.apache.hudi.DataSourceWriteOptions.{INSERT_OPERATION_OPT_VAL, 
KEYGENERATOR_CLASS_NAME, OPERATION, ORDERING_FIELDS, PARTITIONPATH_FIELD, 
PAYLOAD_CLASS_NAME, RECORD_MERGE_MODE, RECORDKEY_FIELD, TABLE_TYPE, 
UPSERT_OPERATION_OPT_VAL}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode, 
TypedProperties}
+import 
org.apache.hudi.common.config.LockConfiguration.{LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,
 LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, 
LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY}
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodieRecordMerger, HoodieRecordPayload, HoodieTableType, 
OverwriteWithLatestAvroPayload, TableServiceType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
 import 
org.apache.hudi.common.table.timeline.InstantComparison.{compareTimestamps, 
GREATER_THAN_OR_EQUALS}
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
 import org.apache.hudi.common.util.{Option, StringUtils}
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieLockConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig, 
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
 import org.apache.hudi.keygen.NonpartitionedKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
@@ -285,6 +287,103 @@ class TestSevenToEightUpgrade extends 
RecordLevelIndexTestBase {
     assertEquals(1, df2.filter("price = 35").count())
   }
 
+  @Test
+  def testV6TableUpgradeToV9ToV6(): Unit = {
+    val partitionFields = "partition:simple"
+
+    val hudiOptsV6 = commonOpts ++ Map(
+      TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
+      KEYGENERATOR_CLASS_NAME.key -> KeyGeneratorType.CUSTOM.getClassName,
+      PARTITIONPATH_FIELD.key -> partitionFields,
+      "hoodie.metadata.enable" -> "true",
+      PAYLOAD_CLASS_NAME.key -> 
classOf[OverwriteWithLatestAvroPayload].getName,
+      RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name,
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> "6",
+      HoodieWriteConfig.TIMELINE_LAYOUT_VERSION_NUM.key() -> 
Integer.toString(TimelineLayoutVersion.VERSION_1),
+      HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "false",
+      HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key -> "3",
+      HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key -> "4",
+      HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key -> "5",
+      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+      LOCK_ACQUIRE_WAIT_TIMEOUT_MS_PROP_KEY -> "60000",
+      LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY -> "10",
+      LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY -> "1000",
+      HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key -> 
"OPTIMISTIC_CONCURRENCY_CONTROL"
+    )
+
+    doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
+      operation = INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      validate = false)
+
+    for (i <- 1 to 10) {
+      doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
+        operation = INSERT_OPERATION_OPT_VAL,
+        saveMode = SaveMode.Append,
+        validate = false)
+    }
+    metaClient = getLatestMetaClient(true)
+
+    assertEquals(HoodieTableVersion.SIX, 
metaClient.getTableConfig.getTableVersion)
+    assertEquals("partition", 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+    val archivePath = new 
org.apache.hudi.storage.StoragePath(metaClient.getArchivePath, 
".commits_.archive*")
+    val archivedFiles = metaClient.getStorage.globEntries(archivePath)
+    println(s"Archived files found ${archivedFiles.size()}")
+
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storage.getConf())
+      .build()
+
+    val hudiOptsUpgrade = hudiOptsV6 ++ Map(
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> 
HoodieTableVersion.current().versionCode().toString,
+      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+      HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key -> 
"OPTIMISTIC_CONCURRENCY_CONTROL"
+    ) - HoodieWriteConfig.AUTO_UPGRADE_VERSION.key
+
+    doWriteAndValidateDataAndRecordIndex(hudiOptsUpgrade,
+      operation = UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append,
+      validate = false)
+
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storage.getConf())
+      .build()
+
+    assertEquals(HoodieTableVersion.current(), 
metaClient.getTableConfig.getTableVersion)
+    assertEquals(partitionFields, 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+    val archivedFilesAfterUpgrade = 
metaClient.getStorage.globEntries(archivePath)
+
+    assertTrue(archivedFilesAfterUpgrade.size() > 0,
+      "Even after upgrade, fresh table with ~12 commits should have archived 
files")
+
+    val hudiOptsDowngrade = hudiOptsV6 ++ Map(
+      HoodieWriteConfig.WRITE_TABLE_VERSION.key -> 
HoodieTableVersion.SIX.versionCode().toString,
+      HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key -> 
"org.apache.hudi.client.transaction.lock.InProcessLockProvider",
+      HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key -> 
"OPTIMISTIC_CONCURRENCY_CONTROL"
+    )
+
+    new UpgradeDowngrade(metaClient, getWriteConfig(hudiOptsDowngrade, 
basePath), context, SparkUpgradeDowngradeHelper.getInstance)
+      .run(HoodieTableVersion.SIX, null)
+
+    metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(storage.getConf())
+      .build()
+
+    assertEquals(HoodieTableVersion.SIX, 
metaClient.getTableConfig.getTableVersion)
+    assertEquals("partition", 
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+    val v6ArchivePath = new 
org.apache.hudi.storage.StoragePath(metaClient.getArchivePath, 
".commits_.archive*")
+    val v6ArchivedFiles = metaClient.getStorage.globEntries(v6ArchivePath)
+
+    assertTrue(v6ArchivedFiles.size() > 0, "Downgrade should have archived 
files in V6 format")
+  }
+
   private def getWriteConfig(hudiOpts: Map[String, String], basePath: String): 
HoodieWriteConfig = {
     val props = TypedProperties.fromMap(hudiOpts.asJava)
     HoodieWriteConfig.newBuilder()

Reply via email to