This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cf3a2ea [HUDI-2401] Load archived instants for flink streaming reader
(#3610)
cf3a2ea is described below
commit cf3a2ead32f432757668d49a9138f891110aa9a5
Author: Danny Chan <[email protected]>
AuthorDate: Wed Sep 8 10:43:54 2021 +0800
[HUDI-2401] Load archived instants for flink streaming reader (#3610)
---
.../org/apache/hudi/source/StreamReadMonitoringFunction.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
index 112dfda..ec56903 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java
@@ -352,10 +352,16 @@ public class StreamReadMonitoringFunction
// 1. the start commit is 'earliest';
// 2. the start instant is archived.
HoodieArchivedTimeline archivedTimeline =
metaClient.getArchivedTimeline();
- if (!metaClient.getArchivedTimeline().empty()) {
- Stream<HoodieInstant> instantStream =
archivedTimeline.getCommitsTimeline().filterCompletedInstants().getInstants();
+ HoodieTimeline archivedCompleteTimeline =
archivedTimeline.getCommitsTimeline().filterCompletedInstants();
+ if (!archivedCompleteTimeline.empty()) {
+ final String endTs =
archivedCompleteTimeline.lastInstant().get().getTimestamp();
+ Stream<HoodieInstant> instantStream =
archivedCompleteTimeline.getInstants();
if (instantRange != null) {
+
archivedTimeline.loadInstantDetailsInMemory(instantRange.getStartInstant(),
endTs);
instantStream = instantStream.filter(s ->
HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS,
instantRange.getStartInstant()));
+ } else {
+ final String startTs =
archivedCompleteTimeline.firstInstant().get().getTimestamp();
+ archivedTimeline.loadInstantDetailsInMemory(startTs, endTs);
}
return instantStream
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path,
instant, archivedTimeline)).collect(Collectors.toList());