This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 884a7f8  fix: handle as-of timestamp for excluding file groups (#268)
884a7f8 is described below

commit 884a7f86321bca110b9cbe6e7e2d5f65d0307118
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 21:58:40 2025 -0600

    fix: handle as-of timestamp for excluding file groups (#268)
    
    When getting excluding file groups, it should consider the as-of timestamp 
if provided.
---
 crates/core/src/table/fs_view.rs     | 31 ++++++++++++++++++++----
 crates/core/src/table/mod.rs         | 47 +++++++++++++++++++++++++++++++++---
 crates/core/src/timeline/mod.rs      | 11 +++++++--
 crates/core/src/timeline/selector.rs | 33 ++++++++++++++++---------
 4 files changed, 99 insertions(+), 23 deletions(-)

diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index d96cf62..d25389b 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -118,13 +118,20 @@ mod tests {
     async fn fs_view_get_latest_file_slices() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let latest_timestamp = hudi_table
+            .timeline
+            .completed_commits
+            .iter()
+            .next_back()
+            .map(|i| i.timestamp.clone())
+            .unwrap();
         let fs_view = &hudi_table.file_system_view;
 
         assert!(fs_view.partition_to_file_groups.is_empty());
         let partition_pruner = PartitionPruner::empty();
         let excludes = HashSet::new();
         let file_slices = fs_view
-            .get_file_slices_as_of("20240418173551906", &partition_pruner, 
&excludes)
+            .get_file_slices_as_of(&latest_timestamp, &partition_pruner, 
&excludes)
             .await
             .unwrap();
         assert_eq!(fs_view.partition_to_file_groups.len(), 1);
@@ -143,17 +150,24 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_replace_commit() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let latest_timestamp = hudi_table
+            .timeline
+            .completed_commits
+            .iter()
+            .next_back()
+            .map(|i| i.timestamp.clone())
+            .unwrap();
         let fs_view = &hudi_table.file_system_view;
 
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
         let partition_pruner = PartitionPruner::empty();
         let excludes = &hudi_table
             .timeline
-            .get_replaced_file_groups()
+            .get_replaced_file_groups_as_of(&latest_timestamp)
             .await
             .unwrap();
         let file_slices = fs_view
-            .get_file_slices_as_of("20240707001303088", &partition_pruner, 
excludes)
+            .get_file_slices_as_of(&latest_timestamp, &partition_pruner, 
excludes)
             .await
             .unwrap();
         assert_eq!(fs_view.partition_to_file_groups.len(), 3);
@@ -172,13 +186,20 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_partition_filters() {
         let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let latest_timestamp = hudi_table
+            .timeline
+            .completed_commits
+            .iter()
+            .next_back()
+            .map(|i| i.timestamp.clone())
+            .unwrap();
         let fs_view = &hudi_table.file_system_view;
 
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
 
         let excludes = &hudi_table
             .timeline
-            .get_replaced_file_groups()
+            .get_replaced_file_groups_as_of(&latest_timestamp)
             .await
             .unwrap();
         let partition_schema = 
hudi_table.get_partition_schema().await.unwrap();
@@ -193,7 +214,7 @@ mod tests {
         .unwrap();
 
         let file_slices = fs_view
-            .get_file_slices_as_of("20240418173235694", &partition_pruner, 
excludes)
+            .get_file_slices_as_of(&latest_timestamp, &partition_pruner, 
excludes)
             .await
             .unwrap();
         assert_eq!(fs_view.partition_to_file_groups.len(), 1);
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index ee8114d..7a2cf09 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -295,7 +295,10 @@ impl Table {
         timestamp: &str,
         filters: &[Filter],
     ) -> Result<Vec<FileSlice>> {
-        let excludes = self.timeline.get_replaced_file_groups().await?;
+        let excludes = self
+            .timeline
+            .get_replaced_file_groups_as_of(timestamp)
+            .await?;
         let partition_schema = self.get_partition_schema().await?;
         let partition_pruner =
             PartitionPruner::new(filters, &partition_schema, 
self.hudi_configs.as_ref())?;
@@ -829,10 +832,46 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_get_file_slices_splits_as_of() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
-
         let hudi_table = Table::new(base_url.path()).await.unwrap();
+
+        // before replacecommit (insert overwrite table)
+        let second_latest_timestamp = "20250121000656060";
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_as_of(2, second_latest_timestamp, &[])
+            .await
+            .unwrap();
+        assert_eq!(file_slices_splits.len(), 2);
+        assert_eq!(file_slices_splits[0].len(), 2);
+        assert_eq!(file_slices_splits[1].len(), 1);
+        let file_slices = file_slices_splits
+            .iter()
+            .flatten()
+            .filter(|f| f.partition_path == "10")
+            .collect::<Vec<_>>();
+        assert_eq!(
+            file_slices.len(),
+            1,
+            "Partition 10 should have 1 file slice"
+        );
+        let file_slice = file_slices[0];
+        assert_eq!(
+            file_slice.base_file.file_name(),
+            
"92e64357-e4d1-4639-a9d3-c3535829d0aa-0_1-53-79_20250121000647668.parquet"
+        );
+        assert_eq!(
+            file_slice.log_files.len(),
+            1,
+            "File slice should have 1 log file"
+        );
+        assert_eq!(
+            file_slice.log_files.iter().next().unwrap().file_name(),
+            
".92e64357-e4d1-4639-a9d3-c3535829d0aa-0_20250121000647668.log.1_0-73-101"
+        );
+
+        // as of replacecommit (insert overwrite table)
+        let latest_timestamp = "20250121000702475";
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_as_of(2, "20250121000702475", &[])
+            .get_file_slices_splits_as_of(2, latest_timestamp, &[])
             .await
             .unwrap();
         assert_eq!(file_slices_splits.len(), 1);
@@ -882,7 +921,7 @@ mod tests {
         );
 
         // as of non-exist old timestamp
-        let opts = [(AsOfTimestamp.as_ref(), "0")];
+        let opts = [(AsOfTimestamp.as_ref(), "19700101000000")];
         let hudi_table = Table::new_with_options(base_url.path(), opts)
             .await
             .unwrap();
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 4125641..dccd08a 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -181,9 +181,16 @@ impl Timeline {
         }
     }
 
-    pub async fn get_replaced_file_groups(&self) -> Result<HashSet<FileGroup>> 
{
+    pub async fn get_replaced_file_groups_as_of(
+        &self,
+        timestamp: &str,
+    ) -> Result<HashSet<FileGroup>> {
         let mut file_groups: HashSet<FileGroup> = HashSet::new();
-        let selector = 
TimelineSelector::completed_replacecommits(self.hudi_configs.clone());
+        let selector = TimelineSelector::completed_replacecommits_in_range(
+            self.hudi_configs.clone(),
+            None,
+            Some(timestamp),
+        )?;
         for instant in selector.select(self)? {
             let commit_metadata = self.get_commit_metadata(&instant).await?;
             file_groups.extend(build_replaced_file_groups(&commit_metadata)?);
diff --git a/crates/core/src/timeline/selector.rs 
b/crates/core/src/timeline/selector.rs
index 7ce5aff..ff02910 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -152,6 +152,12 @@ impl TimelineSelector {
             .to::<String>()
     }
 
+    fn parse_datetime(timezone: &str, timestamp: Option<&str>) -> 
Result<Option<DateTime<Utc>>> {
+        timestamp
+            .map(|e| Instant::parse_datetime(e, timezone))
+            .transpose()
+    }
+
     pub fn completed_commits(hudi_configs: Arc<HudiConfigs>) -> Result<Self> {
         Self::completed_commits_in_range(hudi_configs, None, None)
     }
@@ -162,12 +168,8 @@ impl TimelineSelector {
         end: Option<&str>,
     ) -> Result<Self> {
         let timezone = Self::get_timezone_from_configs(&hudi_configs);
-        let start_datetime = start
-            .map(|s| Instant::parse_datetime(s, &timezone))
-            .transpose()?;
-        let end_datetime = end
-            .map(|e| Instant::parse_datetime(e, &timezone))
-            .transpose()?;
+        let start_datetime = Self::parse_datetime(&timezone, start)?;
+        let end_datetime = Self::parse_datetime(&timezone, end)?;
         Ok(Self {
             timezone,
             start_datetime,
@@ -178,15 +180,22 @@ impl TimelineSelector {
         })
     }
 
-    pub fn completed_replacecommits(hudi_configs: Arc<HudiConfigs>) -> Self {
-        Self {
-            timezone: Self::get_timezone_from_configs(&hudi_configs),
-            start_datetime: None,
-            end_datetime: None,
+    pub fn completed_replacecommits_in_range(
+        hudi_configs: Arc<HudiConfigs>,
+        start: Option<&str>,
+        end: Option<&str>,
+    ) -> Result<Self> {
+        let timezone = Self::get_timezone_from_configs(&hudi_configs);
+        let start_datetime = Self::parse_datetime(&timezone, start)?;
+        let end_datetime = Self::parse_datetime(&timezone, end)?;
+        Ok(Self {
+            timezone,
+            start_datetime,
+            end_datetime,
             states: vec![State::Completed],
             actions: vec![Action::ReplaceCommit],
             include_archived: false,
-        }
+        })
     }
 
     pub fn should_include_action(&self, action: &Action) -> bool {

Reply via email to