This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 64b1dc1  feat: add `hoodie.read.listing.parallelism` config (#235)
64b1dc1 is described below

commit 64b1dc11cb9f1ec9be472025ecc9a43cae49d6cb
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 5 12:31:41 2025 -0600

    feat: add `hoodie.read.listing.parallelism` config (#235)
---
 crates/core/src/config/read.rs   | 27 ++++++++++++++--------
 crates/core/src/table/fs_view.rs | 49 +++++++++++++++++++++++-----------------
 2 files changed, 45 insertions(+), 31 deletions(-)

diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index ece692e..4f131c2 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -42,23 +42,24 @@ use crate::config::{ConfigParser, HudiConfigValue};
 
 #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
 pub enum HudiReadConfig {
-    /// Define input splits
-    /// - Hoodie Key : hoodie.read.input.partitions
+    /// The query instant for time travel. Without specified this option, we 
query the latest snapshot.
+    AsOfTimestamp,
+
+    /// Number of input partitions to read the data in parallel.
     ///
-    /// If has 100 files, [InputPartitions] is 5, will product 5 chunk,
-    /// every iter or task process 20 files
+    /// For processing 100 files, [InputPartitions] being 5 will produce 5 
partitions, with each partition having 20 files.
     InputPartitions,
 
-    /// The query instant for time travel. Without specified this option, we 
query the latest snapshot.
-    /// - Hoodie Key : hoodie.read.as.of.timestamp
-    AsOfTimestamp,
+    /// Parallelism for listing files on storage.
+    ListingParallelism,
 }
 
 impl AsRef<str> for HudiReadConfig {
     fn as_ref(&self) -> &str {
         match self {
-            Self::InputPartitions => "hoodie.read.input.partitions",
             Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
+            Self::InputPartitions => "hoodie.read.input.partitions",
+            Self::ListingParallelism => "hoodie.read.listing.parallelism",
         }
     }
 }
@@ -69,6 +70,7 @@ impl ConfigParser for HudiReadConfig {
     fn default_value(&self) -> Option<HudiConfigValue> {
         match self {
             HudiReadConfig::InputPartitions => 
Some(HudiConfigValue::UInteger(0usize)),
+            HudiReadConfig::ListingParallelism => 
Some(HudiConfigValue::UInteger(10usize)),
             _ => None,
         }
     }
@@ -80,12 +82,17 @@ impl ConfigParser for HudiReadConfig {
             .ok_or(NotFound(self.key()));
 
         match self {
+            Self::AsOfTimestamp => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
             Self::InputPartitions => get_result
                 .and_then(|v| {
                     usize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
                 })
                 .map(HudiConfigValue::UInteger),
-            Self::AsOfTimestamp => get_result.map(|v| 
HudiConfigValue::String(v.to_string())),
+            Self::ListingParallelism => get_result
+                .and_then(|v| {
+                    usize::from_str(v).map_err(|e| ParseInt(self.key(), 
v.to_string(), e))
+                })
+                .map(HudiConfigValue::UInteger),
         }
     }
 }
@@ -99,7 +106,7 @@ mod tests {
     fn parse_valid_config_value() {
         let options = HashMap::from([(InputPartitions.as_ref().to_string(), 
"100".to_string())]);
         let value = 
InputPartitions.parse_value(&options).unwrap().to::<usize>();
-        assert_eq!(value, 100usize);
+        assert_eq!(value, 100);
     }
 
     #[test]
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 6ca5d9b..eef3150 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,6 +25,7 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
 use crate::storage::file_info::FileInfo;
 use crate::storage::{get_leaf_dirs, Storage};
 
+use crate::config::read::HudiReadConfig::ListingParallelism;
 use crate::error::CoreError;
 use crate::table::partition::PartitionPruner;
 use crate::Result;
@@ -55,11 +56,11 @@ impl FileSystemView {
         })
     }
 
-    async fn load_all_partition_paths(storage: &Storage) -> 
Result<Vec<String>> {
-        Self::load_partition_paths(storage, &PartitionPruner::empty()).await
+    async fn list_all_partition_paths(storage: &Storage) -> 
Result<Vec<String>> {
+        Self::list_partition_paths(storage, &PartitionPruner::empty()).await
     }
 
-    async fn load_partition_paths(
+    async fn list_partition_paths(
         storage: &Storage,
         partition_pruner: &PartitionPruner,
     ) -> Result<Vec<String>> {
@@ -86,7 +87,7 @@ impl FileSystemView {
             .collect())
     }
 
-    async fn load_file_groups_for_partition(
+    async fn list_file_groups_for_partition(
         storage: &Storage,
         partition_path: &str,
     ) -> Result<Vec<FileGroup>> {
@@ -118,35 +119,30 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub async fn get_file_slices_as_of(
-        &self,
-        timestamp: &str,
-        partition_pruner: &PartitionPruner,
-        excluding_file_groups: &HashSet<FileGroup>,
-    ) -> Result<Vec<FileSlice>> {
-        let all_partition_paths = 
Self::load_all_partition_paths(&self.storage).await?;
+    async fn load_file_groups(&self, partition_pruner: &PartitionPruner) -> 
Result<()> {
+        let all_partition_paths = 
Self::list_all_partition_paths(&self.storage).await?;
 
-        let partition_paths_to_load = all_partition_paths
+        let partition_paths_to_list = all_partition_paths
             .into_iter()
             .filter(|p| !self.partition_to_file_groups.contains_key(p))
             .filter(|p| partition_pruner.should_include(p))
             .collect::<HashSet<_>>();
 
-        stream::iter(partition_paths_to_load)
+        let parallelism = self
+            .hudi_configs
+            .get_or_default(ListingParallelism)
+            .to::<usize>();
+        stream::iter(partition_paths_to_list)
             .map(|path| async move {
                 let file_groups =
-                    Self::load_file_groups_for_partition(&self.storage, 
&path).await?;
+                    Self::list_file_groups_for_partition(&self.storage, 
&path).await?;
                 Ok::<_, CoreError>((path, file_groups))
             })
-            // TODO parameterize the parallelism for partition loading
-            .buffer_unordered(10)
+            .buffer_unordered(parallelism)
             .try_for_each(|(path, file_groups)| async move {
                 self.partition_to_file_groups.insert(path, file_groups);
                 Ok(())
             })
-            .await?;
-
-        self.collect_file_slices_as_of(timestamp, partition_pruner, 
excluding_file_groups)
             .await
     }
 
@@ -174,6 +170,17 @@ impl FileSystemView {
         }
         Ok(file_slices)
     }
+
+    pub async fn get_file_slices_as_of(
+        &self,
+        timestamp: &str,
+        partition_pruner: &PartitionPruner,
+        excluding_file_groups: &HashSet<FileGroup>,
+    ) -> Result<Vec<FileSlice>> {
+        self.load_file_groups(partition_pruner).await?;
+        self.collect_file_slices_as_of(timestamp, partition_pruner, 
excluding_file_groups)
+            .await
+    }
 }
 
 #[cfg(test)]
@@ -205,7 +212,7 @@ mod tests {
         let base_url = TestTable::V6Nonpartitioned.url();
         let storage = Storage::new_with_base_url(base_url).unwrap();
         let partition_pruner = PartitionPruner::empty();
-        let partition_paths = FileSystemView::load_partition_paths(&storage, 
&partition_pruner)
+        let partition_paths = FileSystemView::list_partition_paths(&storage, 
&partition_pruner)
             .await
             .unwrap();
         let partition_path_set: HashSet<&str> =
@@ -218,7 +225,7 @@ mod tests {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
         let storage = Storage::new_with_base_url(base_url).unwrap();
         let partition_pruner = PartitionPruner::empty();
-        let partition_paths = FileSystemView::load_partition_paths(&storage, 
&partition_pruner)
+        let partition_paths = FileSystemView::list_partition_paths(&storage, 
&partition_pruner)
             .await
             .unwrap();
         let partition_path_set: HashSet<&str> =

Reply via email to