This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b4f96440b56 [HUDI-7019] Add instant details consumer to
HoodieArchivedTimeline (#9969)
b4f96440b56 is described below
commit b4f96440b562d575878afc5ff09435637cdac8d0
Author: Danny Chan <[email protected]>
AuthorDate: Thu Nov 2 09:17:36 2023 +0800
[HUDI-7019] Add instant details consumer to HoodieArchivedTimeline (#9969)
---
.../table/timeline/HoodieArchivedTimeline.java | 38 ++++++++++++----------
1 file changed, 21 insertions(+), 17 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index cdffd4c0b3c..c489362ae29 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -150,31 +150,34 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
return new HoodieArchivedTimeline(metaClient);
}
- private HoodieInstant readCommit(String instantTime, GenericRecord record,
LoadMode loadMode) {
+ private HoodieInstant readCommit(String instantTime, GenericRecord record,
Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer) {
final String action = record.get(ACTION_ARCHIVED_META_FIELD).toString();
final String completionTime =
record.get(COMPLETION_TIME_ARCHIVED_META_FIELD).toString();
- loadInstantDetails(record, instantTime, loadMode);
+ instantDetailsConsumer.ifPresent(consumer -> consumer.accept(instantTime,
record));
return new HoodieInstant(HoodieInstant.State.COMPLETED, action,
instantTime, completionTime);
}
- private void loadInstantDetails(GenericRecord record, String instantTime,
LoadMode loadMode) {
+ @Nullable
+ private BiConsumer<String, GenericRecord> getInstantDetailsFunc(LoadMode
loadMode) {
switch (loadMode) {
case METADATA:
- ByteBuffer commitMeta = (ByteBuffer)
record.get(METADATA_ARCHIVED_META_FIELD);
- if (commitMeta != null) {
- // in case the entry comes from an empty completed meta file
- this.readCommits.put(instantTime, commitMeta.array());
- }
- break;
+ return (instant, record) -> {
+ ByteBuffer commitMeta = (ByteBuffer)
record.get(METADATA_ARCHIVED_META_FIELD);
+ if (commitMeta != null) {
+ // in case the entry comes from an empty completed meta file
+ this.readCommits.put(instant, commitMeta.array());
+ }
+ };
case PLAN:
- ByteBuffer plan = (ByteBuffer) record.get(PLAN_ARCHIVED_META_FIELD);
- if (plan != null) {
- // in case the entry comes from an empty completed meta file
- this.readCommits.put(instantTime, plan.array());
- }
- break;
+ return (instant, record) -> {
+ ByteBuffer plan = (ByteBuffer) record.get(PLAN_ARCHIVED_META_FIELD);
+ if (plan != null) {
+ // in case the entry comes from an empty completed meta file
+ this.readCommits.put(instant, plan.array());
+ }
+ };
default:
- // no operation
+ return null;
}
}
@@ -201,7 +204,8 @@ public class HoodieArchivedTimeline extends
HoodieDefaultTimeline {
LoadMode loadMode,
Function<GenericRecord, Boolean> commitsFilter) {
Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
- loadInstants(metaClient, filter, loadMode, commitsFilter, (instantTime,
avroRecord) -> instantsInRange.putIfAbsent(instantTime, readCommit(instantTime,
avroRecord, loadMode)));
+ Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer =
Option.ofNullable(getInstantDetailsFunc(loadMode));
+ loadInstants(metaClient, filter, loadMode, commitsFilter, (instantTime,
avroRecord) -> instantsInRange.putIfAbsent(instantTime, readCommit(instantTime,
avroRecord, instantDetailsConsumer)));
ArrayList<HoodieInstant> result = new
ArrayList<>(instantsInRange.values());
Collections.sort(result);
return result;