codope commented on code in PR #395:
URL: https://github.com/apache/hudi-rs/pull/395#discussion_r2315147424


##########
crates/core/src/timeline/mod.rs:
##########
@@ -54,75 +61,70 @@ pub const DEFAULT_LOADING_ACTIONS: &[Action] =
     &[Action::Commit, Action::DeltaCommit, Action::ReplaceCommit];
 
 impl Timeline {
-    #[cfg(test)]
-    pub(crate) async fn new_from_completed_commits(
+    pub(crate) fn new(
         hudi_configs: Arc<HudiConfigs>,
-        storage_options: Arc<HashMap<String, String>>,
-        completed_commits: Vec<Instant>,
-    ) -> Result<Self> {
-        let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
-        Ok(Self {
+        storage: Arc<Storage>,
+        active_loader: TimelineLoader,
+        archived_loader: Option<TimelineLoader>,
+    ) -> Self {
+        Self {
             hudi_configs,
             storage,
-            completed_commits,
-        })
+            active_loader,
+            archived_loader,
+            completed_commits: Vec::new(),
+        }
     }
 
     pub(crate) async fn new_from_storage(
         hudi_configs: Arc<HudiConfigs>,
         storage_options: Arc<HashMap<String, String>>,
     ) -> Result<Self> {
         let storage = Storage::new(storage_options.clone(), 
hudi_configs.clone())?;
+        let mut timeline = TimelineBuilder::new(hudi_configs, 
storage).build().await?;
         let selector = TimelineSelector::completed_actions_in_range(
             DEFAULT_LOADING_ACTIONS,
-            hudi_configs.clone(),
+            timeline.hudi_configs.clone(),
             None,
             None,
         )?;
-        let completed_commits = Self::load_instants(&selector, &storage, 
false).await?;
-        Ok(Self {
-            hudi_configs,
-            storage,
-            completed_commits,
-        })
+        timeline.completed_commits = timeline.load_instants(&selector, 
false).await?;
+        Ok(timeline)
+    }
+
+    pub async fn load_instants(
+        &self,
+        selector: &TimelineSelector,
+        desc: bool,
+    ) -> Result<Vec<Instant>> {
+        self.active_loader.load_instants(selector, desc).await
     }
 
-    async fn load_instants(
+    pub async fn load_instants_with_archive(
+        &self,
         selector: &TimelineSelector,
-        storage: &Storage,
         desc: bool,
+        include_archived: bool,
     ) -> Result<Vec<Instant>> {
-        let files = storage.list_files(Some(HUDI_METADATA_DIR)).await?;
-
-        // For most cases, we load completed instants, so we can pre-allocate 
the vector with a
-        // capacity of 1/3 of the total number of listed files,
-        // ignoring requested and inflight instants.
-        let mut instants = Vec::with_capacity(files.len() / 3);
-
-        for file_info in files {
-            match selector.try_create_instant(file_info.name.as_str()) {
-                Ok(instant) => instants.push(instant),
-                Err(e) => {
-                    // Ignore files that are not valid or desired instants.
-                    debug!(
-                        "Instant not created from file {:?} due to: {:?}",
-                        file_info, e
-                    );
+        // Always try active first
+        let mut instants = self.active_loader.load_instants(selector, 
desc).await?;
+
+        // Load archived/compacted if flag is set
+        if include_archived {
+            if let Some(archived_loader) = &self.archived_loader {
+                let archived_instants = archived_loader
+                    .load_archived_instants(selector, desc)
+                    .await?;
+                instants.extend(archived_instants);
+
+                // Re-sort after merging
+                instants.sort_unstable();

Review Comment:
   Loaders now return sorted lists; we'll replace the final sort with a merge 
when archived reading lands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to