This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 683c9d79d7fd [HUDI-9376] Support reading archived timeline with 
backwards compatibility (#13774)
683c9d79d7fd is described below

commit 683c9d79d7fdd8453b61131f3d840f005fe50a21
Author: Alex R <[email protected]>
AuthorDate: Tue Aug 26 16:58:01 2025 -0700

    [HUDI-9376] Support reading archived timeline with backwards compatibility 
(#13774)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../common/table/log/block/HoodieDataBlock.java    |  12 +++-
 .../timeline/versioning/v1/ArchivedTimelineV1.java |  19 ++++--
 .../table/timeline/TestArchivedTimelineV1.java     |  72 +++++++++++++++++++++
 .../hudi_0_13_0_archive/.commits_.archive.1_1-0-1  | Bin 0 -> 37721 bytes
 .../hudi_0_8_0_archive/.commits_.archive.1_1-0-1   | Bin 0 -> 15533 bytes
 5 files changed, 96 insertions(+), 7 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index f78f4f260bb8..084909860282 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
 
+import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -371,7 +372,16 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
 
   protected Schema getSchemaFromHeader() {
     String schemaStr = getLogBlockHeader().get(HeaderMetadataType.SCHEMA);
-    SCHEMA_MAP.computeIfAbsent(schemaStr, (schemaString) -> new 
Schema.Parser().parse(schemaString));
+    SCHEMA_MAP.computeIfAbsent(schemaStr,
+        (schemaString) -> {
+          try {
+            return new Schema.Parser().parse(schemaStr);
+          } catch (AvroTypeException e) {
+            // Archived commits from earlier hudi versions fail the schema 
check
+            // So we retry in this one specific instance.
+            return new 
Schema.Parser().setValidateDefaults(false).parse(schemaStr);
+          }
+        });
     return SCHEMA_MAP.get(schemaStr);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
index 7b1101e6f68d..e8f0407c300c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
@@ -41,6 +41,7 @@ import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +59,9 @@ public class ArchivedTimelineV1 extends BaseTimelineV1 
implements HoodieArchived
   private static final String ACTION_STATE = "actionState";
   private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
   private HoodieTableMetaClient metaClient;
-  private final Map<String, Map<HoodieInstant.State, byte[]>> readCommits = 
new HashMap<>();
+  // The first key is the timestamp -> multiple action types -> hoodie instant 
state and contents
+  private final Map<String, Map<String, Map<HoodieInstant.State, byte[]>>> 
+      readCommits = new HashMap<>();
   private final ArchivedTimelineLoaderV1 timelineLoader = new 
ArchivedTimelineLoaderV1();
 
   private static final Logger LOG = 
LoggerFactory.getLogger(org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.class);
@@ -162,8 +165,12 @@ public class ArchivedTimelineV1 extends BaseTimelineV1 
implements HoodieArchived
   }
 
   @Override
-  public Option<byte[]> getInstantDetails(HoodieInstant instant) {
-    return Option.ofNullable(readCommits.getOrDefault(instant.requestedTime(), 
new HashMap<>()).get(instant.getState()));
+public Option<byte[]> getInstantDetails(HoodieInstant instant) {
+    return Option.ofNullable(
+        readCommits
+            .getOrDefault(instant.requestedTime(), Collections.emptyMap())
+            .getOrDefault(instant.getAction(), Collections.emptyMap())
+            .get(instant.getState()));
   }
 
   @Override
@@ -281,11 +288,11 @@ public class ArchivedTimelineV1 extends BaseTimelineV1 
implements HoodieArchived
       getMetadataKey(hoodieInstant).map(key -> {
         Object actionData = record.get(key);
         if (actionData != null) {
-          this.readCommits.computeIfAbsent(instantTime, k -> new HashMap<>());
+          this.readCommits.computeIfAbsent(instantTime, k -> new 
HashMap<>()).computeIfAbsent(action, a -> new HashMap<>());
           if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
-            readCommits.get(instantTime).put(hoodieInstant.getState(), 
HoodieAvroUtils.avroToBytes((IndexedRecord) actionData));
+            
readCommits.get(instantTime).get(action).put(hoodieInstant.getState(), 
HoodieAvroUtils.avroToBytes((IndexedRecord) actionData));
           } else {
-            readCommits.get(instantTime).put(hoodieInstant.getState(), 
actionData.toString().getBytes(StandardCharsets.UTF_8));
+            
readCommits.get(instantTime).get(action).put(hoodieInstant.getState(), 
actionData.toString().getBytes(StandardCharsets.UTF_8));
           }
         }
         return null;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
index 69cd03daf378..5e37cbf08ae4 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -47,6 +48,8 @@ import 
org.apache.hudi.common.table.timeline.versioning.v1.InstantGeneratorV1;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.util.CompactionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathFilter;
 
@@ -56,6 +59,8 @@ import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -73,6 +78,7 @@ import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETE
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -434,6 +440,72 @@ public class TestArchivedTimelineV1 extends 
HoodieCommonTestHarness {
     assertEquals(3, timeline.getInstants().size());
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"hudi_0_13_0_archive", "hudi_0_8_0_archive"})
+  void shouldReadArchivedFileAndValidateContent(String archivePath) {
+    String path = 
this.getClass().getClassLoader().getResource(archivePath).getPath() + 
"/.commits_.archive.1_1-0-1";
+    assertDoesNotThrow(() -> readAndValidateArchivedFile(path, 
metaClient.getStorage()));
+  }
+
+  @Test
+  public void testDuplicateTimestampDifferentActions() throws Exception {
+    // Two actions that share the very same commit time "30"
+    HoodieInstant commit30 = createInstantV1(COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "30");
+    HoodieInstant clean30  = createInstantV1(COMPLETED, 
HoodieTimeline.CLEAN_ACTION,  "30");
+
+    HoodieInstant commit31 = createInstantV1(COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "31");
+
+    // Write them to a brand new archive log
+    try (HoodieLogFormat.Writer writer = buildWriter(
+            
ArchivedTimelineV1.getArchiveLogPath(metaClient.getArchivePath()))) {
+      List<IndexedRecord> records = new ArrayList<>();
+      records.add(createArchivedMetaWrapper(commit30));
+      records.add(createArchivedMetaWrapper(clean30));
+      records.add(createArchivedMetaWrapper(commit31));
+      writeArchiveLog(writer, records);
+    }
+
+    // Build timeline (no filters), should read the new archive
+    HoodieArchivedTimeline dupTimeline = new ArchivedTimelineV1(metaClient);
+    dupTimeline.loadCompletedInstantDetailsInMemory();
+
+    // Both actions for 30 are present (positive case)
+    assertTrue(dupTimeline.getInstantDetails(commit30).isPresent(),
+        "commit metadata for ts=30 must be loaded");
+    assertTrue(dupTimeline.getInstantDetails(clean30).isPresent(),
+        "clean metadata for ts=30 must be loaded");
+    assertFalse(Arrays.equals(dupTimeline.getInstantDetails(commit30).get(),
+        dupTimeline.getInstantDetails(clean30).get()),
+        "clean and commit metadata should have different payloads");
+
+    // For ts=31 we only archived COMMIT, so CLEAN must be absent (negative 
case)
+    HoodieInstant fakeClean31 = createInstantV1(COMPLETED, 
HoodieTimeline.CLEAN_ACTION, "31");
+
+    assertTrue(dupTimeline.getInstantDetails(commit31).isPresent(),
+        "commit metadata for ts=31 must be loaded");
+    assertFalse(dupTimeline.getInstantDetails(fakeClean31).isPresent(),
+        "clean metadata for ts=31 must NOT be loaded");
+  }
+
+  private void readAndValidateArchivedFile(String path, HoodieStorage storage) 
throws IOException {
+    try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(
+        storage, new HoodieLogFile(path), 
HoodieArchivedMetaEntry.getClassSchema())) {
+
+      while (reader.hasNext()) {
+        HoodieLogBlock block = reader.next();
+        if (block instanceof HoodieAvroDataBlock) {
+          HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
+          try (ClosableIterator<HoodieRecord<IndexedRecord>> itr =
+                   
avroBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+            if (itr.hasNext()) {
+              itr.next();
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * Validate whether the instants of given timestamps of the hudi archived 
timeline are loaded to memory or not.
    * @param hoodieArchivedTimeline archived timeline to test against
diff --git 
a/hudi-hadoop-common/src/test/resources/hudi_0_13_0_archive/.commits_.archive.1_1-0-1
 
b/hudi-hadoop-common/src/test/resources/hudi_0_13_0_archive/.commits_.archive.1_1-0-1
new file mode 100644
index 000000000000..9435bf20d70e
Binary files /dev/null and 
b/hudi-hadoop-common/src/test/resources/hudi_0_13_0_archive/.commits_.archive.1_1-0-1
 differ
diff --git 
a/hudi-hadoop-common/src/test/resources/hudi_0_8_0_archive/.commits_.archive.1_1-0-1
 
b/hudi-hadoop-common/src/test/resources/hudi_0_8_0_archive/.commits_.archive.1_1-0-1
new file mode 100644
index 000000000000..7688ab268ac6
Binary files /dev/null and 
b/hudi-hadoop-common/src/test/resources/hudi_0_8_0_archive/.commits_.archive.1_1-0-1
 differ

Reply via email to