This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 683c9d79d7fd [HUDI-9376] Support reading archived timeline with
backwards compatibility (#13774)
683c9d79d7fd is described below
commit 683c9d79d7fdd8453b61131f3d840f005fe50a21
Author: Alex R <[email protected]>
AuthorDate: Tue Aug 26 16:58:01 2025 -0700
[HUDI-9376] Support reading archived timeline with backwards compatibility
(#13774)
Co-authored-by: Jonathan Vexler <=>
---
.../common/table/log/block/HoodieDataBlock.java | 12 +++-
.../timeline/versioning/v1/ArchivedTimelineV1.java | 19 ++++--
.../table/timeline/TestArchivedTimelineV1.java | 72 +++++++++++++++++++++
.../hudi_0_13_0_archive/.commits_.archive.1_1-0-1 | Bin 0 -> 37721 bytes
.../hudi_0_8_0_archive/.commits_.archive.1_1-0-1 | Bin 0 -> 15533 bytes
5 files changed, 96 insertions(+), 7 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index f78f4f260bb8..084909860282 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -28,6 +28,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.avro.AvroTypeException;
import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -371,7 +372,16 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
protected Schema getSchemaFromHeader() {
String schemaStr = getLogBlockHeader().get(HeaderMetadataType.SCHEMA);
- SCHEMA_MAP.computeIfAbsent(schemaStr, (schemaString) -> new
Schema.Parser().parse(schemaString));
+ SCHEMA_MAP.computeIfAbsent(schemaStr,
+ (schemaString) -> {
+ try {
+ return new Schema.Parser().parse(schemaStr);
+ } catch (AvroTypeException e) {
+ // Archived commits from earlier hudi versions fail the schema
check
+ // So we retry in this one specific instance.
+ return new
Schema.Parser().setValidateDefaults(false).parse(schemaStr);
+ }
+ });
return SCHEMA_MAP.get(schemaStr);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
index 7b1101e6f68d..e8f0407c300c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
@@ -41,6 +41,7 @@ import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +59,9 @@ public class ArchivedTimelineV1 extends BaseTimelineV1
implements HoodieArchived
private static final String ACTION_STATE = "actionState";
private static final String STATE_TRANSITION_TIME = "stateTransitionTime";
private HoodieTableMetaClient metaClient;
- private final Map<String, Map<HoodieInstant.State, byte[]>> readCommits =
new HashMap<>();
+ // The first key is the timestamp -> multiple action types -> hoodie instant
state and contents
+ private final Map<String, Map<String, Map<HoodieInstant.State, byte[]>>>
+ readCommits = new HashMap<>();
private final ArchivedTimelineLoaderV1 timelineLoader = new
ArchivedTimelineLoaderV1();
private static final Logger LOG =
LoggerFactory.getLogger(org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.class);
@@ -162,8 +165,12 @@ public class ArchivedTimelineV1 extends BaseTimelineV1
implements HoodieArchived
}
@Override
- public Option<byte[]> getInstantDetails(HoodieInstant instant) {
- return Option.ofNullable(readCommits.getOrDefault(instant.requestedTime(),
new HashMap<>()).get(instant.getState()));
+public Option<byte[]> getInstantDetails(HoodieInstant instant) {
+ return Option.ofNullable(
+ readCommits
+ .getOrDefault(instant.requestedTime(), Collections.emptyMap())
+ .getOrDefault(instant.getAction(), Collections.emptyMap())
+ .get(instant.getState()));
}
@Override
@@ -281,11 +288,11 @@ public class ArchivedTimelineV1 extends BaseTimelineV1
implements HoodieArchived
getMetadataKey(hoodieInstant).map(key -> {
Object actionData = record.get(key);
if (actionData != null) {
- this.readCommits.computeIfAbsent(instantTime, k -> new HashMap<>());
+ this.readCommits.computeIfAbsent(instantTime, k -> new
HashMap<>()).computeIfAbsent(action, a -> new HashMap<>());
if (action.equals(HoodieTimeline.COMPACTION_ACTION)) {
- readCommits.get(instantTime).put(hoodieInstant.getState(),
HoodieAvroUtils.avroToBytes((IndexedRecord) actionData));
+
readCommits.get(instantTime).get(action).put(hoodieInstant.getState(),
HoodieAvroUtils.avroToBytes((IndexedRecord) actionData));
} else {
- readCommits.get(instantTime).put(hoodieInstant.getState(),
actionData.toString().getBytes(StandardCharsets.UTF_8));
+
readCommits.get(instantTime).get(action).put(hoodieInstant.getState(),
actionData.toString().getBytes(StandardCharsets.UTF_8));
}
}
return null;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
index 69cd03daf378..5e37cbf08ae4 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.WriteOperationType;
@@ -47,6 +48,8 @@ import
org.apache.hudi.common.table.timeline.versioning.v1.InstantGeneratorV1;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathFilter;
@@ -56,6 +59,8 @@ import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -73,6 +78,7 @@ import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETE
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT;
import static
org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -434,6 +440,72 @@ public class TestArchivedTimelineV1 extends
HoodieCommonTestHarness {
assertEquals(3, timeline.getInstants().size());
}
+ @ParameterizedTest
+ @ValueSource(strings = {"hudi_0_13_0_archive", "hudi_0_8_0_archive"})
+ void shouldReadArchivedFileAndValidateContent(String archivePath) {
+ String path =
this.getClass().getClassLoader().getResource(archivePath).getPath() +
"/.commits_.archive.1_1-0-1";
+ assertDoesNotThrow(() -> readAndValidateArchivedFile(path,
metaClient.getStorage()));
+ }
+
+ @Test
+ public void testDuplicateTimestampDifferentActions() throws Exception {
+ // Two actions that share the very same commit time "30"
+ HoodieInstant commit30 = createInstantV1(COMPLETED,
HoodieTimeline.COMMIT_ACTION, "30");
+ HoodieInstant clean30 = createInstantV1(COMPLETED,
HoodieTimeline.CLEAN_ACTION, "30");
+
+ HoodieInstant commit31 = createInstantV1(COMPLETED,
HoodieTimeline.COMMIT_ACTION, "31");
+
+ // Write them to a brand new archive log
+ try (HoodieLogFormat.Writer writer = buildWriter(
+
ArchivedTimelineV1.getArchiveLogPath(metaClient.getArchivePath()))) {
+ List<IndexedRecord> records = new ArrayList<>();
+ records.add(createArchivedMetaWrapper(commit30));
+ records.add(createArchivedMetaWrapper(clean30));
+ records.add(createArchivedMetaWrapper(commit31));
+ writeArchiveLog(writer, records);
+ }
+
+ // Build timeline (no filters), should read the new archive
+ HoodieArchivedTimeline dupTimeline = new ArchivedTimelineV1(metaClient);
+ dupTimeline.loadCompletedInstantDetailsInMemory();
+
+ // Both actions for 30 are present (positive case)
+ assertTrue(dupTimeline.getInstantDetails(commit30).isPresent(),
+ "commit metadata for ts=30 must be loaded");
+ assertTrue(dupTimeline.getInstantDetails(clean30).isPresent(),
+ "clean metadata for ts=30 must be loaded");
+ assertFalse(Arrays.equals(dupTimeline.getInstantDetails(commit30).get(),
+ dupTimeline.getInstantDetails(clean30).get()),
+ "clean and commit metadata should have different payloads");
+
+ // For ts=31 we only archived COMMIT, so CLEAN must be absent (negative
case)
+ HoodieInstant fakeClean31 = createInstantV1(COMPLETED,
HoodieTimeline.CLEAN_ACTION, "31");
+
+ assertTrue(dupTimeline.getInstantDetails(commit31).isPresent(),
+ "commit metadata for ts=31 must be loaded");
+ assertFalse(dupTimeline.getInstantDetails(fakeClean31).isPresent(),
+ "clean metadata for ts=31 must NOT be loaded");
+ }
+
+ private void readAndValidateArchivedFile(String path, HoodieStorage storage)
throws IOException {
+ try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(
+ storage, new HoodieLogFile(path),
HoodieArchivedMetaEntry.getClassSchema())) {
+
+ while (reader.hasNext()) {
+ HoodieLogBlock block = reader.next();
+ if (block instanceof HoodieAvroDataBlock) {
+ HoodieAvroDataBlock avroBlock = (HoodieAvroDataBlock) block;
+ try (ClosableIterator<HoodieRecord<IndexedRecord>> itr =
+
avroBlock.getRecordIterator(HoodieRecord.HoodieRecordType.AVRO)) {
+ if (itr.hasNext()) {
+ itr.next();
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* Validate whether the instants of given timestamps of the hudi archived
timeline are loaded to memory or not.
* @param hoodieArchivedTimeline archived timeline to test against
diff --git
a/hudi-hadoop-common/src/test/resources/hudi_0_13_0_archive/.commits_.archive.1_1-0-1
b/hudi-hadoop-common/src/test/resources/hudi_0_13_0_archive/.commits_.archive.1_1-0-1
new file mode 100644
index 000000000000..9435bf20d70e
Binary files /dev/null and
b/hudi-hadoop-common/src/test/resources/hudi_0_13_0_archive/.commits_.archive.1_1-0-1
differ
diff --git
a/hudi-hadoop-common/src/test/resources/hudi_0_8_0_archive/.commits_.archive.1_1-0-1
b/hudi-hadoop-common/src/test/resources/hudi_0_8_0_archive/.commits_.archive.1_1-0-1
new file mode 100644
index 000000000000..7688ab268ac6
Binary files /dev/null and
b/hudi-hadoop-common/src/test/resources/hudi_0_8_0_archive/.commits_.archive.1_1-0-1
differ