This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b4f96440b56 [HUDI-7019] Add instant details consumer to 
HoodieArchivedTimeline (#9969)
b4f96440b56 is described below

commit b4f96440b562d575878afc5ff09435637cdac8d0
Author: Danny Chan <[email protected]>
AuthorDate: Thu Nov 2 09:17:36 2023 +0800

    [HUDI-7019] Add instant details consumer to HoodieArchivedTimeline (#9969)
---
 .../table/timeline/HoodieArchivedTimeline.java     | 38 ++++++++++++----------
 1 file changed, 21 insertions(+), 17 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index cdffd4c0b3c..c489362ae29 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -150,31 +150,34 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
     return new HoodieArchivedTimeline(metaClient);
   }
 
-  private HoodieInstant readCommit(String instantTime, GenericRecord record, 
LoadMode loadMode) {
+  private HoodieInstant readCommit(String instantTime, GenericRecord record, 
Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer) {
     final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
     final String completionTime = 
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
-    loadInstantDetails(record, instantTime, loadMode);
+    instantDetailsConsumer.ifPresent(consumer -> consumer.accept(instantTime, 
record));
     return new HoodieInstant(HoodieInstant.State.COMPLETED, action, 
instantTime, completionTime);
   }
 
-  private void loadInstantDetails(GenericRecord record, String instantTime, 
LoadMode loadMode) {
+  @Nullable
+  private BiConsumer<String, GenericRecord> getInstantDetailsFunc(LoadMode 
loadMode) {
     switch (loadMode) {
       case METADATA:
-        ByteBuffer commitMeta = (ByteBuffer) 
record.get(METADATA_ARCHIVED_META_FIELD);
-        if (commitMeta != null) {
-          // in case the entry comes from an empty completed meta file
-          this.readCommits.put(instantTime, commitMeta.array());
-        }
-        break;
+        return (instant, record) -> {
+          ByteBuffer commitMeta = (ByteBuffer) 
record.get(METADATA_ARCHIVED_META_FIELD);
+          if (commitMeta != null) {
+            // in case the entry comes from an empty completed meta file
+            this.readCommits.put(instant, commitMeta.array());
+          }
+        };
       case PLAN:
-        ByteBuffer plan = (ByteBuffer) record.get(PLAN_ARCHIVED_META_FIELD);
-        if (plan != null) {
-          // in case the entry comes from an empty completed meta file
-          this.readCommits.put(instantTime, plan.array());
-        }
-        break;
+        return (instant, record) -> {
+          ByteBuffer plan = (ByteBuffer) record.get(PLAN_ARCHIVED_META_FIELD);
+          if (plan != null) {
+            // in case the entry comes from an empty completed meta file
+            this.readCommits.put(instant, plan.array());
+          }
+        };
       default:
-        // no operation
+        return null;
     }
   }
 
@@ -201,7 +204,8 @@ public class HoodieArchivedTimeline extends 
HoodieDefaultTimeline {
       LoadMode loadMode,
       Function<GenericRecord, Boolean> commitsFilter) {
     Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
-    loadInstants(metaClient, filter, loadMode, commitsFilter, (instantTime, 
avroRecord) -> instantsInRange.putIfAbsent(instantTime, readCommit(instantTime, 
avroRecord, loadMode)));
+    Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer = 
Option.ofNullable(getInstantDetailsFunc(loadMode));
+    loadInstants(metaClient, filter, loadMode, commitsFilter, (instantTime, 
avroRecord) -> instantsInRange.putIfAbsent(instantTime, readCommit(instantTime, 
avroRecord, instantDetailsConsumer)));
     ArrayList<HoodieInstant> result = new 
ArrayList<>(instantsInRange.values());
     Collections.sort(result);
     return result;

Reply via email to