This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4836a37  fix: make partition loading more efficient (#152)
4836a37 is described below

commit 4836a37c65139d4ed48aa9a37a71f119bb03f00f
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Oct 6 14:28:58 2024 -1000

    fix: make partition loading more efficient (#152)
    
    Fix the partition loading logic such that it
    
    - does not skip relevant partitions
    - makes file group loading parallelized
---
 crates/core/src/table/fs_view.rs |  81 +++++++++++++++------------
 crates/core/src/table/mod.rs     | 118 ++++++++++++++++++++++-----------------
 2 files changed, 113 insertions(+), 86 deletions(-)

diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 9d249b8..4172f12 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,9 +25,10 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
 use crate::storage::file_info::FileInfo;
 use crate::storage::{get_leaf_dirs, Storage};
 use crate::table::partition::PartitionPruner;
-use anyhow::{anyhow, Result};
+use anyhow::Result;
 use arrow::record_batch::RecordBatch;
 use dashmap::DashMap;
+use futures::stream::{self, StreamExt, TryStreamExt};
 use url::Url;
 
 #[derive(Clone, Debug)]
@@ -53,6 +54,10 @@ impl FileSystemView {
         })
     }
 
+    async fn load_all_partition_paths(storage: &Storage) -> 
Result<Vec<String>> {
+        Self::load_partition_paths(storage, &PartitionPruner::empty()).await
+    }
+
     async fn load_partition_paths(
         storage: &Storage,
         partition_pruner: &PartitionPruner,
@@ -80,22 +85,6 @@ impl FileSystemView {
             .collect())
     }
 
-    async fn load_file_groups_for_partitions(
-        storage: &Storage,
-        partition_paths: Vec<String>,
-    ) -> Result<HashMap<String, Vec<FileGroup>>> {
-        let mut partition_to_file_groups = HashMap::new();
-        for p in partition_paths {
-            match Self::load_file_groups_for_partition(storage, 
p.as_str()).await {
-                Ok(file_groups) => {
-                    partition_to_file_groups.insert(p, file_groups);
-                }
-                Err(e) => return Err(anyhow!("Failed to load partitions: {}", 
e)),
-            }
-        }
-        Ok(partition_to_file_groups)
-    }
-
     async fn load_file_groups_for_partition(
         storage: &Storage,
         partition_path: &str,
@@ -133,32 +122,52 @@ impl FileSystemView {
         timestamp: &str,
         partition_pruner: &PartitionPruner,
         excluding_file_groups: &HashSet<FileGroup>,
+    ) -> Result<Vec<FileSlice>> {
+        let all_partition_paths = 
Self::load_all_partition_paths(&self.storage).await?;
+
+        let partition_paths_to_load = all_partition_paths
+            .into_iter()
+            .filter(|p| !self.partition_to_file_groups.contains_key(p))
+            .filter(|p| partition_pruner.should_include(p))
+            .collect::<HashSet<_>>();
+
+        stream::iter(partition_paths_to_load)
+            .map(|path| async move {
+                let file_groups =
+                    Self::load_file_groups_for_partition(&self.storage, 
&path).await?;
+                Ok::<_, anyhow::Error>((path, file_groups))
+            })
+            // TODO parameterize the parallelism for partition loading
+            .buffer_unordered(10)
+            .try_for_each(|(path, file_groups)| async move {
+                self.partition_to_file_groups.insert(path, file_groups);
+                Ok(())
+            })
+            .await?;
+
+        self.collect_file_slices_as_of(timestamp, partition_pruner, 
excluding_file_groups)
+            .await
+    }
+
+    async fn collect_file_slices_as_of(
+        &self,
+        timestamp: &str,
+        partition_pruner: &PartitionPruner,
+        excluding_file_groups: &HashSet<FileGroup>,
     ) -> Result<Vec<FileSlice>> {
         let mut file_slices = Vec::new();
-        if self.partition_to_file_groups.is_empty() {
-            let partition_paths =
-                Self::load_partition_paths(&self.storage, 
partition_pruner).await?;
-            let partition_to_file_groups =
-                Self::load_file_groups_for_partitions(&self.storage, 
partition_paths).await?;
-            partition_to_file_groups.into_iter().for_each(|pair| {
-                self.partition_to_file_groups.insert(pair.0, pair.1);
-            });
-        }
-        for mut fgs in self
-            .partition_to_file_groups
-            .iter_mut()
-            .filter(|item| partition_pruner.should_include(item.key()))
-        {
-            let fgs_ref = fgs.value_mut();
-            for fg in fgs_ref {
+        for mut partition_entry in self.partition_to_file_groups.iter_mut() {
+            if !partition_pruner.should_include(partition_entry.key()) {
+                continue;
+            }
+            let file_groups = partition_entry.value_mut();
+            for fg in file_groups.iter_mut() {
                 if excluding_file_groups.contains(fg) {
                     continue;
                 }
                 if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
-                    // TODO: pass ref instead of copying
                     fsl.load_stats(&self.storage).await?;
-                    let immut_fsl: &FileSlice = fsl;
-                    file_slices.push(immut_fsl.clone());
+                    file_slices.push(fsl.clone());
                 }
             }
         }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0d802e4..931f2c2 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -793,88 +793,106 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn hudi_table_get_file_paths_for_simple_key_and_non_hive_style() {
+    async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
         let base_url = TestTable::V6SimplekeygenNonhivestyle.url();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert_eq!(hudi_table.timeline.instants.len(), 2);
 
         let partition_filters = &[];
-        let actual: HashSet<String> = HashSet::from_iter(
-            hudi_table
-                .get_file_paths_with_filters(partition_filters)
-                .await
-                .unwrap(),
-        );
-        let expected: HashSet<String> = HashSet::from_iter(
-            vec![
-                
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
-                
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
-                
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
-            ]
+        let actual = hudi_table
+            .get_file_paths_with_filters(partition_filters)
+            .await
+            .unwrap()
             .into_iter()
-            .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
-            .collect::<Vec<_>>(),
-        );
+            .collect::<HashSet<_>>();
+        let expected = [
+            
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+            
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+            
"30/6db57019-98ee-480e-8eb1-fb3de48e1c24-0_1-119-167_20240418172804498.parquet",
+        ]
+        .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+        .into_iter()
+        .collect::<HashSet<_>>();
         assert_eq!(actual, expected);
 
         let partition_filters = &["byteField >= 10", "byteField < 30"];
-        let actual: HashSet<String> = HashSet::from_iter(
-            hudi_table
-                .get_file_paths_with_filters(partition_filters)
-                .await
-                .unwrap(),
-        );
-        let expected: HashSet<String> = HashSet::from_iter(
-            vec![
-                
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
-                
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
-            ]
+        let actual = hudi_table
+            .get_file_paths_with_filters(partition_filters)
+            .await
+            .unwrap()
             .into_iter()
-            .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
-            .collect::<Vec<_>>(),
-        );
+            .collect::<HashSet<_>>();
+        let expected = [
+            
"10/97de74b1-2a8e-4bb7-874c-0a74e1f42a77-0_0-119-166_20240418172804498.parquet",
+            
"20/76e0556b-390d-4249-b7ad-9059e2bc2cbd-0_0-98-141_20240418172802262.parquet",
+        ]
+        .map(|f| join_url_segments(&base_url, &[f]).unwrap().to_string())
+        .into_iter()
+        .collect::<HashSet<_>>();
+        assert_eq!(actual, expected);
+
+        let partition_filters = &["byteField > 30"];
+        let actual = hudi_table
+            .get_file_paths_with_filters(partition_filters)
+            .await
+            .unwrap()
+            .into_iter()
+            .collect::<HashSet<_>>();
+        let expected = HashSet::new();
         assert_eq!(actual, expected);
     }
 
     #[tokio::test]
-    async fn hudi_table_get_file_paths_for_complex_keygen_and_hive_style() {
+    async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert_eq!(hudi_table.timeline.instants.len(), 2);
 
         let partition_filters = &[];
-        let actual: HashSet<String> = HashSet::from_iter(
-            hudi_table
-                .get_file_paths_with_filters(partition_filters)
-                .await
-                .unwrap(),
-        );
-        let expected: HashSet<String> = HashSet::from_iter(vec![
+        let actual = hudi_table
+            .get_file_paths_with_filters(partition_filters)
+            .await
+            .unwrap()
+            .into_iter()
+            .collect::<HashSet<_>>();
+        let expected= [
             
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
             
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
             
"byteField=30/shortField=100/4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet",
         ]
-            .into_iter().map(|f| { join_url_segments(&base_url, 
&[f]).unwrap().to_string() })
-            .collect::<Vec<_>>());
+            .map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() 
})
+            .into_iter()
+            .collect::<HashSet<_>>();
         assert_eq!(actual, expected);
 
         let partition_filters = &["byteField >= 10", "byteField < 20", 
"shortField != 100"];
-        let actual: HashSet<String> = HashSet::from_iter(
-            hudi_table
-                .get_file_paths_with_filters(partition_filters)
-                .await
-                .unwrap(),
-        );
-        let expected: HashSet<String> = HashSet::from_iter(vec![
+        let actual = hudi_table
+            .get_file_paths_with_filters(partition_filters)
+            .await
+            .unwrap()
+            .into_iter()
+            .collect::<HashSet<_>>();
+        let expected = [
             
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
         ]
-            .into_iter().map(|f| { join_url_segments(&base_url, 
&[f]).unwrap().to_string() })
-            .collect::<Vec<_>>());
+            .map(|f| { join_url_segments(&base_url, &[f]).unwrap().to_string() 
})
+            .into_iter()
+            .collect::<HashSet<_>>();
+        assert_eq!(actual, expected);
+
+        let partition_filters = &["byteField > 20", "shortField = 300"];
+        let actual = hudi_table
+            .get_file_paths_with_filters(partition_filters)
+            .await
+            .unwrap()
+            .into_iter()
+            .collect::<HashSet<_>>();
+        let expected = HashSet::new();
         assert_eq!(actual, expected);
     }
 
     #[tokio::test]
-    async fn hudi_table_read_snapshot_for_complex_keygen_and_hive_style() {
+    async fn hudi_table_read_snapshot_for_complex_keygen_hive_style() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let partition_filters = &["byteField >= 10", "byteField < 20", 
"shortField != 100"];

Reply via email to