This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4836a37 fix: make partition loading more efficient (#152)
4836a37 is described below
commit 4836a37c65139d4ed48aa9a37a71f119bb03f00f
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Oct 6 14:28:58 2024 -1000
fix: make partition loading more efficient (#152)
Fix the partition loading logic such that it
- does not skip relevant partitions
- makes file group loading parallelized
---
crates/core/src/table/fs_view.rs | 81 +++++++++++++++------------
crates/core/src/table/mod.rs | 118 ++++++++++++++++++++++-----------------
2 files changed, 113 insertions(+), 86 deletions(-)
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 9d249b8..4172f12 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,9 +25,10 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partition::PartitionPruner;
-use anyhow::{anyhow, Result};
+use anyhow::Result;
use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
+use futures::stream::{self, StreamExt, TryStreamExt};
use url::Url;
#[derive(Clone, Debug)]
@@ -53,6 +54,10 @@ impl FileSystemView {
})
}
+ async fn load_all_partition_paths(storage: &Storage) ->
Result<Vec<String>> {
+ Self::load_partition_paths(storage, &PartitionPruner::empty()).await
+ }
+
async fn load_partition_paths(
storage: &Storage,
partition_pruner: &PartitionPruner,
@@ -80,22 +85,6 @@ impl FileSystemView {
.collect())
}
- async fn load_file_groups_for_partitions(
- storage: &Storage,
- partition_paths: Vec<String>,
- ) -> Result<HashMap<String, Vec<FileGroup>>> {
- let mut partition_to_file_groups = HashMap::new();
- for p in partition_paths {
- match Self::load_file_groups_for_partition(storage,
p.as_str()).await {
- Ok(file_groups) => {
- partition_to_file_groups.insert(p, file_groups);
- }
- Err(e) => return Err(anyhow!("Failed to load partitions: {}",
e)),
- }
- }
- Ok(partition_to_file_groups)
- }
-
async fn load_file_groups_for_partition(
storage: &Storage,
partition_path: &str,
@@ -133,32 +122,52 @@ impl FileSystemView {
timestamp: &str,
partition_pruner: &PartitionPruner,
excluding_file_groups: &HashSet<FileGroup>,
+ ) -> Result<Vec<FileSlice>> {
+ let all_partition_paths =
Self::load_all_partition_paths(&self.storage).await?;
+
+ let partition_paths_to_load = all_partition_paths
+ .into_iter()
+ .filter(|p| !self.partition_to_file_groups.contains_key(p))
+ .filter(|p| partition_pruner.should_include(p))
+ .collect::<HashSet<_>>();
+
+ stream::iter(partition_paths_to_load)
+ .map(|path| async move {
+ let file_groups =
+ Self::load_file_groups_for_partition(&self.storage,
&path).await?;
+ Ok::<_, anyhow::Error>((path, file_groups))
+ })
+ // TODO parameterize the parallelism for partition loading
+ .buffer_unordered(10)
+ .try_for_each(|(path, file_groups)| async move {
+ self.partition_to_file_groups.insert(path, file_groups);
+ Ok(())
+ })
+ .await?;
+
+ self.collect_file_slices_as_of(timestamp, partition_pruner,
excluding_file_groups)
+ .await
+ }
+
+ async fn collect_file_slices_as_of(
+ &self,
+ timestamp: &str,
+ partition_pruner: &PartitionPruner,
+ excluding_file_groups: &HashSet<FileGroup>,
) -> Result<Vec<FileSlice>> {
let mut file_slices = Vec::new();
- if self.partition_to_file_groups.is_empty() {
- let partition_paths =
- Self::load_partition_paths(&self.storage,
partition_pruner).await?;
- let partition_to_file_groups =
- Self::load_file_groups_for_partitions(&self.storage,
partition_paths).await?;
- partition_to_file_groups.into_iter().for_each(|pair| {
- self.partition_to_file_groups.insert(pair.0, pair.1);
- });
- }
- for mut fgs in self
- .partition_to_file_groups
- .iter_mut()
- .filter(|item| partition_pruner.should_include(item.key()))
- {
- let fgs_ref = fgs.value_mut();
- for fg in fgs_ref {
+ for mut partition_entry in self.partition_to_file_groups.iter_mut() {
+ if !partition_pruner.should_include(partition_entry.key()) {
+ continue;
+ }
+ let file_groups = partition_entry.value_mut();
+ for fg in file_groups.iter_mut() {
if excluding_file_groups.contains(fg) {
continue;
}
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
- // TODO: pass ref instead of copying
fsl.load_stats(&self.storage).await?;
- let immut_fsl: &FileSlice = fsl;
- file_slices.push(immut_fsl.clone());
+ file_slices.push(fsl.clone());
}
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0d802e4..931f2c2 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -793,88 +793,106 @@ mod tests {
}
#[tokio::test]
- async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
+ async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);
let partition_filters = &[];
- let actual: HashSet<String> = HashSet::from_iter(
- hudi_table
- .get_file_paths_with_filters(partition_filters)
- .await
- .unwrap(),
- );
- let expected: HashSet<String> = HashSet::from_iter(
- vec![
-
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
-
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
-
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
- ]
+ let actual = hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap()
.into_iter()
- .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
- .collect::<Vec<_>>(),
- );
+ .collect::<HashSet<_>>();
+ let expected = [
+
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
+ ]
+ .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+ .into_iter()
+ .collect::<HashSet<_>>();
assert_eq!(actual, expected);
let partition_filters = &["byteField >= 10", "byteField < 30"];
- let actual: HashSet<String> = HashSet::from_iter(
- hudi_table
- .get_file_paths_with_filters(partition_filters)
- .await
- .unwrap(),
- );
- let expected: HashSet<String> = HashSet::from_iter(
- vec![
-
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
-
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
- ]
+ let actual = hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap()
.into_iter()
- .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
- .collect::<Vec<_>>(),
- );
+ .collect::<HashSet<_>>();
+ let expected = [
+
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+ ]
+ .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+ .into_iter()
+ .collect::<HashSet<_>>();
+ assert_eq!(actual, expected);
+
+ let partition_filters = &["byteField > 30"];
+ let actual = hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap()
+ .into_iter()
+ .collect::<HashSet<_>>();
+ let expected = HashSet::new();
assert_eq!(actual, expected);
}
#[tokio::test]
- async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
+ async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);
let partition_filters = &[];
- let actual: HashSet<String> = HashSet::from_iter(
- hudi_table
- .get_file_paths_with_filters(partition_filters)
- .await
- .unwrap(),
- );
- let expected: HashSet<String> = HashSet::from_iter(vec![
+ let actual = hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap()
+ .into_iter()
+ .collect::<HashSet<_>>();
+ let expected= [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
]
- .into_iter().map(|f| { join_url_segments(&base_url,
&[f]).unwrap().to_string() })
- .collect::<Vec<_>>());
+ .map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string()
})
+ .into_iter()
+ .collect::<HashSet<_>>();
assert_eq!(actual, expected);
let partition_filters = &["byteField >= 10", "byteField < 20",
"shortField != 100"];
- let actual: HashSet<String> = HashSet::from_iter(
- hudi_table
- .get_file_paths_with_filters(partition_filters)
- .await
- .unwrap(),
- );
- let expected: HashSet<String> = HashSet::from_iter(vec![
+ let actual = hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap()
+ .into_iter()
+ .collect::<HashSet<_>>();
+ let expected = [
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
]
- .into_iter().map(|f| { join_url_segments(&base_url,
&[f]).unwrap().to_string() })
- .collect::<Vec<_>>());
+ .map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string()
})
+ .into_iter()
+ .collect::<HashSet<_>>();
+ assert_eq!(actual, expected);
+
+ let partition_filters = &["byteField > 20", "shortField = 300"];
+ let actual = hudi_table
+ .get_file_paths_with_filters(partition_filters)
+ .await
+ .unwrap()
+ .into_iter()
+ .collect::<HashSet<_>>();
+ let expected = HashSet::new();
assert_eq!(actual, expected);
}
#[tokio::test]
- async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() {
+ async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let partition_filters = &["byteField >= 10", "byteField < 20",
"shortField != 100"];