This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e7b360df7bdf [HUDI-9794] Convert the avro into file bytes in
LegacyArchivedMetaEntryReader to keep forward compatibility (#13852)
e7b360df7bdf is described below
commit e7b360df7bdf42a6cac23d60b41731205812e0dc
Author: Vamshi Krishna Kyatham
<[email protected]>
AuthorDate: Mon Sep 8 00:01:17 2025 -0700
[HUDI-9794] Convert the avro into file bytes in
LegacyArchivedMetaEntryReader to keep forward compatibility (#13852)
---
.../timeline/versioning/v1/TimelineArchiverV1.java | 12 ++-
.../utils/LegacyArchivedMetaEntryReader.java | 2 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 14 ++++
.../table/timeline/MetadataConversionUtils.java | 4 +-
.../hudi/functional/TestSevenToEightUpgrade.scala | 91 +++++++++++++++++++++-
5 files changed, 117 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index b8c94ab2804a..345946faaada 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -71,6 +71,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -442,9 +443,14 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
Map<HeaderMetadataType, String> header = new HashMap<>();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
wrapperSchema.toString());
final String keyField =
table.getMetaClient().getTableConfig().getRecordKeyFieldProp();
- List<HoodieRecord> indexRecords =
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList());
- HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords,
header, keyField);
- writer.appendBlock(block);
+ List<HoodieRecord> indexRecords = records.stream()
+ .filter(Objects::nonNull)
+ .map(HoodieAvroIndexedRecord::new)
+ .collect(Collectors.toList());
+ if (!indexRecords.isEmpty()) {
+ HoodieAvroDataBlock block = new HoodieAvroDataBlock(indexRecords,
header, keyField);
+ writer.appendBlock(block);
+ }
records.clear();
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index c75561b61d02..94a676ac5d0f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -100,7 +100,7 @@ public class LegacyArchivedMetaEntryReader {
Object actionData = record.get(key);
if (actionData != null) {
if (actionData instanceof IndexedRecord) {
- return HoodieAvroUtils.avroToBytes((IndexedRecord) actionData);
+ return HoodieAvroUtils.avroToFileBytes((IndexedRecord) actionData);
} else {
// should be json bytes.
try {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index ff35d70158c7..8658d8f3960b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -40,6 +40,7 @@ import org.apache.avro.LogicalTypes;
import org.apache.avro.LogicalTypes.Decimal;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
@@ -161,6 +162,19 @@ public class HoodieAvroUtils {
}
}
+ public static byte[] avroToFileBytes(IndexedRecord record) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ try (DataFileWriter<IndexedRecord> writer = new DataFileWriter<>(new
GenericDatumWriter<>(record.getSchema()))) {
+ writer.create(record.getSchema(), baos);
+ writer.append(record);
+ writer.flush();
+ return baos.toByteArray();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to convert IndexedRecord to Avro
file format", e);
+ }
+ }
+
/**
* Convert a given avro record to json and return the string
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
index 06be0391f2cd..e28af75a1ef7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/MetadataConversionUtils.java
@@ -183,7 +183,9 @@ public class MetadataConversionUtils {
switch (actionType) {
case HoodieTimeline.CLEAN_ACTION: {
archivedMetaWrapper.setHoodieCleanMetadata(CleanerUtils.getCleanerMetadata(metaClient,
new ByteArrayInputStream(instantDetails.get())));
-
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
new ByteArrayInputStream(planBytes.get())));
+ if (planBytes.isPresent()) {
+
archivedMetaWrapper.setHoodieCleanerPlan(CleanerUtils.getCleanerPlan(metaClient,
new ByteArrayInputStream(planBytes.get())));
+ }
archivedMetaWrapper.setActionType(ActionType.clean.name());
break;
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
index 55f0a68bf2d6..d5b1ce003664 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSevenToEightUpgrade.scala
@@ -25,8 +25,9 @@ import org.apache.hudi.common.config.{HoodieMetadataConfig,
RecordMergeMode, Typ
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodieRecordMerger, HoodieRecordPayload, HoodieTableType,
OverwriteWithLatestAvroPayload, TableServiceType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
import
org.apache.hudi.common.table.timeline.InstantComparison.{compareTimestamps,
GREATER_THAN_OR_EQUALS}
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
import org.apache.hudi.common.util.{Option, StringUtils}
-import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieLockConfig, HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig,
HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig}
import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
@@ -285,6 +286,94 @@ class TestSevenToEightUpgrade extends
RecordLevelIndexTestBase {
assertEquals(1, df2.filter("price = 35").count())
}
+ @Test
+ def testV6TableUpgradeToV9ToV6(): Unit = {
+ val partitionFields = "partition:simple"
+
+ val hudiOptsV6 = commonOpts ++ Map(
+ TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
+ KEYGENERATOR_CLASS_NAME.key -> KeyGeneratorType.CUSTOM.getClassName,
+ PARTITIONPATH_FIELD.key -> partitionFields,
+ "hoodie.metadata.enable" -> "true",
+ PAYLOAD_CLASS_NAME.key ->
classOf[OverwriteWithLatestAvroPayload].getName,
+ RECORD_MERGE_MODE.key -> RecordMergeMode.COMMIT_TIME_ORDERING.name,
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key -> "6",
+ HoodieWriteConfig.TIMELINE_LAYOUT_VERSION_NUM.key() ->
Integer.toString(TimelineLayoutVersion.VERSION_1),
+ HoodieWriteConfig.AUTO_UPGRADE_VERSION.key -> "false",
+ HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS.key -> "15",
+ HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key -> "3",
+ HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key -> "4",
+ HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key -> "5"
+ )
+
+ doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
+ operation = INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ validate = false)
+
+ for (i <- 1 to 10) {
+ doWriteAndValidateDataAndRecordIndex(hudiOptsV6,
+ operation = INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
+ }
+ metaClient = getLatestMetaClient(true)
+
+ assertEquals(HoodieTableVersion.SIX,
metaClient.getTableConfig.getTableVersion)
+ assertEquals("partition",
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ val archivePath = new
org.apache.hudi.storage.StoragePath(metaClient.getArchivePath,
".commits_.archive*")
+ val archivedFiles = metaClient.getStorage.globEntries(archivePath)
+ println(s"Archived files found ${archivedFiles.size()}")
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storage.getConf())
+ .build()
+
+ val hudiOptsUpgrade = hudiOptsV6 ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.current().versionCode().toString
+ ) - HoodieWriteConfig.AUTO_UPGRADE_VERSION.key
+
+ doWriteAndValidateDataAndRecordIndex(hudiOptsUpgrade,
+ operation = UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append,
+ validate = false)
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storage.getConf())
+ .build()
+
+ assertEquals(HoodieTableVersion.current(),
metaClient.getTableConfig.getTableVersion)
+ assertEquals(partitionFields,
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ val archivedFilesAfterUpgrade =
metaClient.getStorage.globEntries(archivePath)
+
+ assertTrue(archivedFilesAfterUpgrade.size() > 0,
+ "Even after upgrade, fresh table with ~12 commits should have archived
files")
+
+ val hudiOptsDowngrade = hudiOptsV6 ++ Map(
+ HoodieWriteConfig.WRITE_TABLE_VERSION.key ->
HoodieTableVersion.SIX.versionCode().toString
+ )
+
+ new UpgradeDowngrade(metaClient, getWriteConfig(hudiOptsDowngrade,
basePath), context, SparkUpgradeDowngradeHelper.getInstance)
+ .run(HoodieTableVersion.SIX, null)
+
+ metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(storage.getConf())
+ .build()
+
+ assertEquals(HoodieTableVersion.SIX,
metaClient.getTableConfig.getTableVersion)
+ assertEquals("partition",
HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).get())
+
+ val v6ArchivePath = new
org.apache.hudi.storage.StoragePath(metaClient.getArchivePath,
".commits_.archive*")
+ val v6ArchivedFiles = metaClient.getStorage.globEntries(v6ArchivePath)
+
+ assertTrue(v6ArchivedFiles.size() > 0, "Downgrade should have archived
files in V6 format")
+ }
+
private def getWriteConfig(hudiOpts: Map[String, String], basePath: String):
HoodieWriteConfig = {
val props = TypedProperties.fromMap(hudiOpts.asJava)
HoodieWriteConfig.newBuilder()