This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 884a7f8 fix: handle as-of timestamp for excluding file groups (#268)
884a7f8 is described below
commit 884a7f86321bca110b9cbe6e7e2d5f65d0307118
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 26 21:58:40 2025 -0600
fix: handle as-of timestamp for excluding file groups (#268)
When getting excluding file groups, it should consider the as-of timestamp
if provided.
---
crates/core/src/table/fs_view.rs | 31 ++++++++++++++++++++----
crates/core/src/table/mod.rs | 47 +++++++++++++++++++++++++++++++++---
crates/core/src/timeline/mod.rs | 11 +++++++--
crates/core/src/timeline/selector.rs | 33 ++++++++++++++++---------
4 files changed, 99 insertions(+), 23 deletions(-)
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index d96cf62..d25389b 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -118,13 +118,20 @@ mod tests {
async fn fs_view_get_latest_file_slices() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let latest_timestamp = hudi_table
+ .timeline
+ .completed_commits
+ .iter()
+ .next_back()
+ .map(|i| i.timestamp.clone())
+ .unwrap();
let fs_view = &hudi_table.file_system_view;
assert!(fs_view.partition_to_file_groups.is_empty());
let partition_pruner = PartitionPruner::empty();
let excludes = HashSet::new();
let file_slices = fs_view
- .get_file_slices_as_of("20240418173551906", &partition_pruner,
&excludes)
+ .get_file_slices_as_of(&latest_timestamp, &partition_pruner,
&excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
@@ -143,17 +150,24 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_replace_commit() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let latest_timestamp = hudi_table
+ .timeline
+ .completed_commits
+ .iter()
+ .next_back()
+ .map(|i| i.timestamp.clone())
+ .unwrap();
let fs_view = &hudi_table.file_system_view;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let partition_pruner = PartitionPruner::empty();
let excludes = &hudi_table
.timeline
- .get_replaced_file_groups()
+ .get_replaced_file_groups_as_of(&latest_timestamp)
.await
.unwrap();
let file_slices = fs_view
- .get_file_slices_as_of("20240707001303088", &partition_pruner,
excludes)
+ .get_file_slices_as_of(&latest_timestamp, &partition_pruner,
excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 3);
@@ -172,13 +186,20 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_partition_filters() {
let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let latest_timestamp = hudi_table
+ .timeline
+ .completed_commits
+ .iter()
+ .next_back()
+ .map(|i| i.timestamp.clone())
+ .unwrap();
let fs_view = &hudi_table.file_system_view;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let excludes = &hudi_table
.timeline
- .get_replaced_file_groups()
+ .get_replaced_file_groups_as_of(&latest_timestamp)
.await
.unwrap();
let partition_schema =
hudi_table.get_partition_schema().await.unwrap();
@@ -193,7 +214,7 @@ mod tests {
.unwrap();
let file_slices = fs_view
- .get_file_slices_as_of("20240418173235694", &partition_pruner,
excludes)
+ .get_file_slices_as_of(&latest_timestamp, &partition_pruner,
excludes)
.await
.unwrap();
assert_eq!(fs_view.partition_to_file_groups.len(), 1);
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index ee8114d..7a2cf09 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -295,7 +295,10 @@ impl Table {
timestamp: &str,
filters: &[Filter],
) -> Result<Vec<FileSlice>> {
- let excludes = self.timeline.get_replaced_file_groups().await?;
+ let excludes = self
+ .timeline
+ .get_replaced_file_groups_as_of(timestamp)
+ .await?;
let partition_schema = self.get_partition_schema().await?;
let partition_pruner =
PartitionPruner::new(filters, &partition_schema,
self.hudi_configs.as_ref())?;
@@ -829,10 +832,46 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_file_slices_splits_as_of() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor();
-
let hudi_table = Table::new(base_url.path()).await.unwrap();
+
+ // before replacecommit (insert overwrite table)
+ let second_latest_timestamp = "20250121000656060";
+ let file_slices_splits = hudi_table
+ .get_file_slices_splits_as_of(2, second_latest_timestamp, &[])
+ .await
+ .unwrap();
+ assert_eq!(file_slices_splits.len(), 2);
+ assert_eq!(file_slices_splits[0].len(), 2);
+ assert_eq!(file_slices_splits[1].len(), 1);
+ let file_slices = file_slices_splits
+ .iter()
+ .flatten()
+ .filter(|f| f.partition_path == "10")
+ .collect::<Vec<_>>();
+ assert_eq!(
+ file_slices.len(),
+ 1,
+ "Partition 10 should have 1 file slice"
+ );
+ let file_slice = file_slices[0];
+ assert_eq!(
+ file_slice.base_file.file_name(),
+
"92e64357-e4d1-4639-a9d3-c3535829d0aa-0_1-53-79_20250121000647668.parquet"
+ );
+ assert_eq!(
+ file_slice.log_files.len(),
+ 1,
+ "File slice should have 1 log file"
+ );
+ assert_eq!(
+ file_slice.log_files.iter().next().unwrap().file_name(),
+
".92e64357-e4d1-4639-a9d3-c3535829d0aa-0_20250121000647668.log.1_0-73-101"
+ );
+
+ // as of replacecommit (insert overwrite table)
+ let latest_timestamp = "20250121000702475";
let file_slices_splits = hudi_table
- .get_file_slices_splits_as_of(2, "20250121000702475", &[])
+ .get_file_slices_splits_as_of(2, latest_timestamp, &[])
.await
.unwrap();
assert_eq!(file_slices_splits.len(), 1);
@@ -882,7 +921,7 @@ mod tests {
);
// as of non-exist old timestamp
- let opts = [(AsOfTimestamp.as_ref(), "0")];
+ let opts = [(AsOfTimestamp.as_ref(), "19700101000000")];
let hudi_table = Table::new_with_options(base_url.path(), opts)
.await
.unwrap();
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 4125641..dccd08a 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -181,9 +181,16 @@ impl Timeline {
}
}
- pub async fn get_replaced_file_groups(&self) -> Result<HashSet<FileGroup>>
{
+ pub async fn get_replaced_file_groups_as_of(
+ &self,
+ timestamp: &str,
+ ) -> Result<HashSet<FileGroup>> {
let mut file_groups: HashSet<FileGroup> = HashSet::new();
- let selector =
TimelineSelector::completed_replacecommits(self.hudi_configs.clone());
+ let selector = TimelineSelector::completed_replacecommits_in_range(
+ self.hudi_configs.clone(),
+ None,
+ Some(timestamp),
+ )?;
for instant in selector.select(self)? {
let commit_metadata = self.get_commit_metadata(&instant).await?;
file_groups.extend(build_replaced_file_groups(&commit_metadata)?);
diff --git a/crates/core/src/timeline/selector.rs
b/crates/core/src/timeline/selector.rs
index 7ce5aff..ff02910 100644
--- a/crates/core/src/timeline/selector.rs
+++ b/crates/core/src/timeline/selector.rs
@@ -152,6 +152,12 @@ impl TimelineSelector {
.to::<String>()
}
+ fn parse_datetime(timezone: &str, timestamp: Option<&str>) ->
Result<Option<DateTime<Utc>>> {
+ timestamp
+ .map(|e| Instant::parse_datetime(e, timezone))
+ .transpose()
+ }
+
pub fn completed_commits(hudi_configs: Arc<HudiConfigs>) -> Result<Self> {
Self::completed_commits_in_range(hudi_configs, None, None)
}
@@ -162,12 +168,8 @@ impl TimelineSelector {
end: Option<&str>,
) -> Result<Self> {
let timezone = Self::get_timezone_from_configs(&hudi_configs);
- let start_datetime = start
- .map(|s| Instant::parse_datetime(s, &timezone))
- .transpose()?;
- let end_datetime = end
- .map(|e| Instant::parse_datetime(e, &timezone))
- .transpose()?;
+ let start_datetime = Self::parse_datetime(&timezone, start)?;
+ let end_datetime = Self::parse_datetime(&timezone, end)?;
Ok(Self {
timezone,
start_datetime,
@@ -178,15 +180,22 @@ impl TimelineSelector {
})
}
- pub fn completed_replacecommits(hudi_configs: Arc<HudiConfigs>) -> Self {
- Self {
- timezone: Self::get_timezone_from_configs(&hudi_configs),
- start_datetime: None,
- end_datetime: None,
+ pub fn completed_replacecommits_in_range(
+ hudi_configs: Arc<HudiConfigs>,
+ start: Option<&str>,
+ end: Option<&str>,
+ ) -> Result<Self> {
+ let timezone = Self::get_timezone_from_configs(&hudi_configs);
+ let start_datetime = Self::parse_datetime(&timezone, start)?;
+ let end_datetime = Self::parse_datetime(&timezone, end)?;
+ Ok(Self {
+ timezone,
+ start_datetime,
+ end_datetime,
states: vec![State::Completed],
actions: vec![Action::ReplaceCommit],
include_archived: false,
- }
+ })
}
pub fn should_include_action(&self, action: &Action) -> bool {