nsivabalan commented on code in PR #14261:
URL: https://github.com/apache/hudi/pull/14261#discussion_r2579119997
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java:
##########
@@ -236,12 +272,27 @@ private List<HoodieInstant> loadInstants(
Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer =
Option.ofNullable(getInstantDetailsFunc(loadMode));
timelineLoader.loadInstants(metaClient, filter, loadMode, commitsFilter,
- (instantTime, avroRecord) -> instantsInRange.putIfAbsent(instantTime,
readCommit(instantTime, avroRecord, instantDetailsConsumer)));
+ (instantTime, avroRecord) -> instantsInRange.putIfAbsent(instantTime,
readCommit(instantTime, avroRecord, instantDetailsConsumer)), -1);
List<HoodieInstant> result = new ArrayList<>(instantsInRange.values());
Collections.sort(result);
return result;
}
+ /**
+ * Loads instants with a limit on the number of instants to load.
+ * This is used for limit-based loading where we only want to load the N
most recent instants.
+ */
+ private void loadInstantsWithLimit(int limit,
HoodieArchivedTimeline.LoadMode loadMode,
+ Function<GenericRecord, Boolean> commitsFilter) {
+ Map<String, HoodieInstant> instantsInRange = new ConcurrentHashMap<>();
+ Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer =
Option.ofNullable(getInstantDetailsFunc(loadMode));
+ timelineLoader.loadInstants(metaClient, null, loadMode, commitsFilter,
+ (instantTime, avroRecord) -> instantsInRange.putIfAbsent(instantTime,
readCommit(instantTime, avroRecord, instantDetailsConsumer)), limit);
+ List<HoodieInstant> collectedInstants = new
ArrayList<>(instantsInRange.values());
+ Collections.sort(collectedInstants);
+ appendLoadedInstants(collectedInstants);
Review Comment:
this is differing from what we saw in v1 archived timeline.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/BaseHoodieTimeline.java:
##########
@@ -114,6 +114,22 @@ protected void appendInstants(List<HoodieInstant>
newInstants) {
clearState();
}
+ /**
+ * Helper method to append loaded instants to the timeline, filtering out
duplicates.
+ * This is used by both time-range and limit-based loading to avoid code
duplication.
+ *
+ * @param loadedInstants The list of instants that were loaded to readCommit
field of timeline
+ */
+ protected void appendLoadedInstants(List<HoodieInstant> loadedInstants) {
Review Comment:
we can name this `appendInstants`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/ArchivedTimelineLoader.java:
##########
@@ -45,4 +45,25 @@ void loadInstants(
HoodieArchivedTimeline.LoadMode loadMode,
Function<GenericRecord, Boolean> commitsFilter,
BiConsumer<String, GenericRecord> recordConsumer);
+
+ /**
+ * Loads the instants from the timeline with optional limit for early
termination.
+ *
+ * @param metaClient The meta client.
+ * @param filter The time range filter where the target instant
belongs to.
+ * @param loadMode The load mode.
+ * @param commitsFilter Filter of the instant type.
+ * @param recordConsumer Consumer of the instant record payload.
+ * @param limit Maximum number of instants to load. Use -1 for no
limit.
Review Comment:
if we say "optional" limit, then, the arg should be `Option<Integer>`
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java:
##########
@@ -100,6 +119,9 @@ public void loadInstants(HoodieTableMetaClient metaClient,
int instantsInPreviousFile = instantsInRange.size();
// Read the avro blocks
while (reader.hasNext()) {
+ if (hasLimit && loadedCount.get() >= limit) {
Review Comment:
we can add the condition to within `while` condition itself
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java:
##########
@@ -209,13 +222,44 @@ public void loadCompactionDetailsInMemory(String
compactionInstantTime) {
@Override
public void loadCompactionDetailsInMemory(String startTs, String endTs) {
// load compactionPlan
- loadInstants(new ClosedClosedTimeRangeFilter(startTs, endTs), null, true,
+ List<HoodieInstant> loadedInstants = loadInstants(new
ClosedClosedTimeRangeFilter(startTs, endTs), null, true,
record -> {
// Older files don't have action state set.
Object action = record.get(ACTION_STATE);
return
record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
&& (action == null ||
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(action.toString()));
});
+ appendLoadedInstants(loadedInstants);
+ }
+
+ @Override
+ public void loadCompactionDetailsInMemory(int limit) {
+ loadInstantsWithLimit(limit, true,
+ record -> {
+ Object actionState = record.get(ACTION_STATE);
+ // Older files & archivedTimelineV2 don't have action state set.
+ return
record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
+ && (actionState == null ||
org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(actionState.toString()));
+ });
Review Comment:
So, for limit based apis, its streaming. while for "start and end time"
based apis, its cached response is it?
is it an intentional choice
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]