This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 64b1dc1 feat: add `hoodie.read.listing.parallelism` config (#235)
64b1dc1 is described below
commit 64b1dc11cb9f1ec9be472025ecc9a43cae49d6cb
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 5 12:31:41 2025 -0600
feat: add `hoodie.read.listing.parallelism` config (#235)
---
crates/core/src/config/read.rs | 27 ++++++++++++++--------
crates/core/src/table/fs_view.rs | 49 +++++++++++++++++++++++-----------------
2 files changed, 45 insertions(+), 31 deletions(-)
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index ece692e..4f131c2 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -42,23 +42,24 @@ use crate::config::{ConfigParser, HudiConfigValue};
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
pub enum HudiReadConfig {
- /// Define input splits
- /// - Hoodie Key : hoodie.read.input.partitions
+ /// The query instant for time travel. Without specified this option, we
query the latest snapshot.
+ AsOfTimestamp,
+
+ /// Number of input partitions to read the data in parallel.
///
- /// If has 100 files, [InputPartitions] is 5, will product 5 chunk,
- /// every iter or task process 20 files
+ /// For processing 100 files, [InputPartitions] being 5 will produce 5
partitions, with each partition having 20 files.
InputPartitions,
- /// The query instant for time travel. Without specified this option, we
query the latest snapshot.
- /// - Hoodie Key : hoodie.read.as.of.timestamp
- AsOfTimestamp,
+ /// Parallelism for listing files on storage.
+ ListingParallelism,
}
impl AsRef<str> for HudiReadConfig {
fn as_ref(&self) -> &str {
match self {
- Self::InputPartitions => "hoodie.read.input.partitions",
Self::AsOfTimestamp => "hoodie.read.as.of.timestamp",
+ Self::InputPartitions => "hoodie.read.input.partitions",
+ Self::ListingParallelism => "hoodie.read.listing.parallelism",
}
}
}
@@ -69,6 +70,7 @@ impl ConfigParser for HudiReadConfig {
fn default_value(&self) -> Option<HudiConfigValue> {
match self {
HudiReadConfig::InputPartitions =>
Some(HudiConfigValue::UInteger(0usize)),
+ HudiReadConfig::ListingParallelism =>
Some(HudiConfigValue::UInteger(10usize)),
_ => None,
}
}
@@ -80,12 +82,17 @@ impl ConfigParser for HudiReadConfig {
.ok_or(NotFound(self.key()));
match self {
+ Self::AsOfTimestamp => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
Self::InputPartitions => get_result
.and_then(|v| {
usize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
})
.map(HudiConfigValue::UInteger),
- Self::AsOfTimestamp => get_result.map(|v|
HudiConfigValue::String(v.to_string())),
+ Self::ListingParallelism => get_result
+ .and_then(|v| {
+ usize::from_str(v).map_err(|e| ParseInt(self.key(),
v.to_string(), e))
+ })
+ .map(HudiConfigValue::UInteger),
}
}
}
@@ -99,7 +106,7 @@ mod tests {
fn parse_valid_config_value() {
let options = HashMap::from([(InputPartitions.as_ref().to_string(),
"100".to_string())]);
let value =
InputPartitions.parse_value(&options).unwrap().to::<usize>();
- assert_eq!(value, 100usize);
+ assert_eq!(value, 100);
}
#[test]
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 6ca5d9b..eef3150 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,6 +25,7 @@ use crate::file_group::{BaseFile, FileGroup, FileSlice};
use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
+use crate::config::read::HudiReadConfig::ListingParallelism;
use crate::error::CoreError;
use crate::table::partition::PartitionPruner;
use crate::Result;
@@ -55,11 +56,11 @@ impl FileSystemView {
})
}
- async fn load_all_partition_paths(storage: &Storage) ->
Result<Vec<String>> {
- Self::load_partition_paths(storage, &PartitionPruner::empty()).await
+ async fn list_all_partition_paths(storage: &Storage) ->
Result<Vec<String>> {
+ Self::list_partition_paths(storage, &PartitionPruner::empty()).await
}
- async fn load_partition_paths(
+ async fn list_partition_paths(
storage: &Storage,
partition_pruner: &PartitionPruner,
) -> Result<Vec<String>> {
@@ -86,7 +87,7 @@ impl FileSystemView {
.collect())
}
- async fn load_file_groups_for_partition(
+ async fn list_file_groups_for_partition(
storage: &Storage,
partition_path: &str,
) -> Result<Vec<FileGroup>> {
@@ -118,35 +119,30 @@ impl FileSystemView {
Ok(file_groups)
}
- pub async fn get_file_slices_as_of(
- &self,
- timestamp: &str,
- partition_pruner: &PartitionPruner,
- excluding_file_groups: &HashSet<FileGroup>,
- ) -> Result<Vec<FileSlice>> {
- let all_partition_paths =
Self::load_all_partition_paths(&self.storage).await?;
+ async fn load_file_groups(&self, partition_pruner: &PartitionPruner) ->
Result<()> {
+ let all_partition_paths =
Self::list_all_partition_paths(&self.storage).await?;
- let partition_paths_to_load = all_partition_paths
+ let partition_paths_to_list = all_partition_paths
.into_iter()
.filter(|p| !self.partition_to_file_groups.contains_key(p))
.filter(|p| partition_pruner.should_include(p))
.collect::<HashSet<_>>();
- stream::iter(partition_paths_to_load)
+ let parallelism = self
+ .hudi_configs
+ .get_or_default(ListingParallelism)
+ .to::<usize>();
+ stream::iter(partition_paths_to_list)
.map(|path| async move {
let file_groups =
- Self::load_file_groups_for_partition(&self.storage,
&path).await?;
+ Self::list_file_groups_for_partition(&self.storage,
&path).await?;
Ok::<_, CoreError>((path, file_groups))
})
- // TODO parameterize the parallelism for partition loading
- .buffer_unordered(10)
+ .buffer_unordered(parallelism)
.try_for_each(|(path, file_groups)| async move {
self.partition_to_file_groups.insert(path, file_groups);
Ok(())
})
- .await?;
-
- self.collect_file_slices_as_of(timestamp, partition_pruner,
excluding_file_groups)
.await
}
@@ -174,6 +170,17 @@ impl FileSystemView {
}
Ok(file_slices)
}
+
+ pub async fn get_file_slices_as_of(
+ &self,
+ timestamp: &str,
+ partition_pruner: &PartitionPruner,
+ excluding_file_groups: &HashSet<FileGroup>,
+ ) -> Result<Vec<FileSlice>> {
+ self.load_file_groups(partition_pruner).await?;
+ self.collect_file_slices_as_of(timestamp, partition_pruner,
excluding_file_groups)
+ .await
+ }
}
#[cfg(test)]
@@ -205,7 +212,7 @@ mod tests {
let base_url = TestTable::V6Nonpartitioned.url();
let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
- let partition_paths = FileSystemView::load_partition_paths(&storage,
&partition_pruner)
+ let partition_paths = FileSystemView::list_partition_paths(&storage,
&partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =
@@ -218,7 +225,7 @@ mod tests {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
let storage = Storage::new_with_base_url(base_url).unwrap();
let partition_pruner = PartitionPruner::empty();
- let partition_paths = FileSystemView::load_partition_paths(&storage,
&partition_pruner)
+ let partition_paths = FileSystemView::list_partition_paths(&storage,
&partition_pruner)
.await
.unwrap();
let partition_path_set: HashSet<&str> =