This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4575ccd refactor: improve file system view's listing flow (#251)
4575ccd is described below
commit 4575ccd534c1d8e179773e61c8d785e314e37c2d
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 19 21:11:19 2025 -0600
refactor: improve file system view's listing flow (#251)
- Move file listing logic from `FileSystemView` to `FileLister`
- Change FileGroup and FileSlice partition_path to accept empty string for
non-partitioned tables
- File system view will refresh relevant partitions for loading the latest
file groups when a query needs to scan those
- Skip Iceberg's and Delta's metadata dirs when listing top level dirs
---
crates/core/src/file_group/builder.rs | 23 +--
crates/core/src/file_group/file_slice.rs | 12 +-
crates/core/src/file_group/mod.rs | 29 ++--
crates/core/src/metadata/mod.rs | 10 ++
crates/core/src/table/fs_view.rs | 212 +++-------------------------
crates/core/src/table/listing.rs | 234 +++++++++++++++++++++++++++++++
crates/core/src/table/mod.rs | 1 +
crates/core/src/table/partition.rs | 18 +++
python/src/internal.rs | 2 +-
9 files changed, 313 insertions(+), 228 deletions(-)
diff --git a/crates/core/src/file_group/builder.rs
b/crates/core/src/file_group/builder.rs
index 680b55d..4f73aa1 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -38,8 +38,6 @@ pub fn build_file_groups(commit_metadata: &Map<String,
Value>) -> Result<HashSet
.as_array()
.ok_or_else(|| CoreError::CommitMetadata("Invalid write stats
array".into()))?;
- let partition = (!partition.is_empty()).then(|| partition.to_string());
-
for stat in write_stats {
let file_id = stat
.get("fileId")
@@ -85,8 +83,6 @@ pub fn build_replaced_file_groups(
.as_array()
.ok_or_else(|| CoreError::CommitMetadata("Invalid file group ids
array".into()))?;
- let partition = (!partition.is_empty()).then(|| partition.to_string());
-
for file_id in file_ids {
let id = file_id
.as_str()
@@ -269,17 +265,15 @@ mod tests {
"byteField=20/shortField=100",
"byteField=10/shortField=300",
]);
- let actual_partitions = HashSet::<&str>::from_iter(
- file_groups
- .iter()
- .map(|fg| fg.partition_path.as_ref().unwrap().as_str()),
- );
+ let actual_partitions =
+ HashSet::<&str>::from_iter(file_groups.iter().map(|fg|
fg.partition_path.as_str()));
assert_eq!(actual_partitions, expected_partitions);
}
}
mod test_build_replaced_file_groups {
use super::super::*;
+ use crate::table::partition::EMPTY_PARTITION_PATH;
use serde_json::{json, Map, Value};
#[test]
@@ -369,7 +363,7 @@ mod tests {
let file_groups = result.unwrap();
assert_eq!(file_groups.len(), 1);
let file_group = file_groups.iter().next().unwrap();
- assert!(file_group.partition_path.is_none());
+ assert_eq!(file_group.partition_path, EMPTY_PARTITION_PATH);
}
#[test]
@@ -391,7 +385,7 @@ mod tests {
let file_groups = result.unwrap();
let actual_partition_paths = file_groups
.iter()
- .map(|fg| fg.partition_path.as_ref().unwrap().as_str())
+ .map(|fg| fg.partition_path.as_str())
.collect::<Vec<_>>();
assert_eq!(actual_partition_paths, &["20", "20"]);
}
@@ -432,11 +426,8 @@ mod tests {
assert_eq!(file_groups.len(), 3);
let expected_partitions = HashSet::from_iter(vec!["10", "20",
"30"]);
- let actual_partitions = HashSet::<&str>::from_iter(
- file_groups
- .iter()
- .map(|fg| fg.partition_path.as_ref().unwrap().as_str()),
- );
+ let actual_partitions =
+ HashSet::<&str>::from_iter(file_groups.iter().map(|fg|
fg.partition_path.as_str()));
assert_eq!(actual_partitions, expected_partitions);
}
}
diff --git a/crates/core/src/file_group/file_slice.rs
b/crates/core/src/file_group/file_slice.rs
index c8ca359..b9df522 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -30,11 +30,11 @@ use std::path::PathBuf;
pub struct FileSlice {
pub base_file: BaseFile,
pub log_files: BTreeSet<LogFile>,
- pub partition_path: Option<String>,
+ pub partition_path: String,
}
impl FileSlice {
- pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
+ pub fn new(base_file: BaseFile, partition_path: String) -> Self {
Self {
base_file,
log_files: BTreeSet::new(),
@@ -43,7 +43,7 @@ impl FileSlice {
}
fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
- let path = PathBuf::from(self.partition_path()).join(file_name);
+ let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
path.to_str().map(|s| s.to_string()).ok_or_else(|| {
CoreError::FileGroup(format!("Failed to get relative path for
file: {file_name}",))
})
@@ -67,12 +67,6 @@ impl FileSlice {
&self.base_file.file_id
}
- /// Returns the partition path of the [FileSlice].
- #[inline]
- pub fn partition_path(&self) -> &str {
- self.partition_path.as_deref().unwrap_or_default()
- }
-
/// Returns the instant time that marks the [FileSlice] creation.
///
/// This is also an instant time stored in the [Timeline].
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 7d3bcc7..54c83fc 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -41,7 +41,7 @@ use std::str::FromStr;
#[derive(Clone, Debug)]
pub struct FileGroup {
pub file_id: String,
- pub partition_path: Option<String>,
+ pub partition_path: String,
pub file_slices: BTreeMap<String, FileSlice>,
}
@@ -64,7 +64,7 @@ impl fmt::Display for FileGroup {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.write_str(
format!(
- "File Group: partition {:?} id {}",
+ "File Group: partition={}, id={}",
&self.partition_path, &self.file_id
)
.as_str(),
@@ -73,7 +73,7 @@ impl fmt::Display for FileGroup {
}
impl FileGroup {
- pub fn new(file_id: String, partition_path: Option<String>) -> Self {
+ pub fn new(file_id: String, partition_path: String) -> Self {
Self {
file_id,
partition_path,
@@ -83,7 +83,7 @@ impl FileGroup {
pub fn new_with_base_file_name(
id: String,
- partition_path: Option<String>,
+ partition_path: String,
file_name: &str,
) -> Result<Self> {
let mut file_group = Self::new(id, partition_path);
@@ -175,10 +175,14 @@ impl FileGroup {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::table::partition::EMPTY_PARTITION_PATH;
#[test]
fn load_a_valid_file_group() {
- let mut fg =
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+ let mut fg = FileGroup::new(
+ "5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
+ EMPTY_PARTITION_PATH.to_string(),
+ );
let _ = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
);
@@ -186,7 +190,7 @@ mod tests {
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
);
assert_eq!(fg.file_slices.len(), 2);
- assert!(fg.partition_path.is_none());
+ assert_eq!(fg.partition_path, EMPTY_PARTITION_PATH);
let commit_times: Vec<&str> = fg.file_slices.keys().map(|k|
k.as_str()).collect();
assert_eq!(commit_times, vec!["20240402123035233",
"20240402144910683"]);
assert_eq!(
@@ -201,7 +205,10 @@ mod tests {
#[test]
fn add_base_file_with_same_commit_time_should_fail() {
- let mut fg =
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+ let mut fg = FileGroup::new(
+ "5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
+ EMPTY_PARTITION_PATH.to_string(),
+ );
let res1 = fg.add_base_file_from_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
);
@@ -217,7 +224,7 @@ mod tests {
fn test_file_group_display() {
let file_group = FileGroup {
file_id: "group123".to_string(),
- partition_path: Some("part/2023-01-01".to_string()),
+ partition_path: "part/2023-01-01".to_string(),
file_slices: BTreeMap::new(),
};
@@ -225,12 +232,12 @@ mod tests {
assert_eq!(
display_string,
- "File Group: partition Some(\"part/2023-01-01\") id group123"
+ "File Group: partition=part/2023-01-01, id=group123"
);
let file_group_no_partition = FileGroup {
file_id: "group456".to_string(),
- partition_path: None,
+ partition_path: EMPTY_PARTITION_PATH.to_string(),
file_slices: BTreeMap::new(),
};
@@ -238,7 +245,7 @@ mod tests {
assert_eq!(
display_string_no_partition,
- "File Group: partition None id group456"
+ "File Group: partition=, id=group456"
);
}
}
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index 48133fd..e7ae3b1 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -17,3 +17,13 @@
* under the License.
*/
pub mod meta_field;
+
+pub const HUDI_METADATA_DIR: &str = ".hoodie";
+pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
+pub const ICEBERG_METADATA_DIR: &str = "metadata";
+
+pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[
+ HUDI_METADATA_DIR,
+ DELTALAKE_METADATA_DIR,
+ ICEBERG_METADATA_DIR,
+];
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 0abbe08..e5fa7e3 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -21,26 +21,21 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::config::HudiConfigs;
-use crate::file_group::base_file::BaseFile;
use crate::file_group::FileGroup;
-use crate::storage::{get_leaf_dirs, Storage};
+use crate::storage::Storage;
-use crate::config::read::HudiReadConfig::ListingParallelism;
-use crate::config::table::HudiTableConfig::BaseFileFormat;
-use crate::error::CoreError;
use crate::file_group::file_slice::FileSlice;
-use crate::file_group::log_file::LogFile;
-use crate::table::partition::{PartitionPruner, PARTITION_METAFIELD_PREFIX};
+use crate::table::listing::FileLister;
+use crate::table::partition::PartitionPruner;
use crate::Result;
use dashmap::DashMap;
-use futures::stream::{self, StreamExt, TryStreamExt};
/// A view of the Hudi table's data files (files stored outside the `.hoodie/`
directory) in the file system. It provides APIs to load and
/// access the file groups and file slices.
#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct FileSystemView {
- hudi_configs: Arc<HudiConfigs>,
+ pub(crate) hudi_configs: Arc<HudiConfigs>,
pub(crate) storage: Arc<Storage>,
partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
}
@@ -59,136 +54,18 @@ impl FileSystemView {
})
}
- fn should_exclude_for_listing(file_name: &str) -> bool {
- file_name.starts_with(PARTITION_METAFIELD_PREFIX) ||
file_name.ends_with(".crc")
- }
-
- async fn list_all_partition_paths(storage: &Storage) ->
Result<Vec<String>> {
- Self::list_partition_paths(storage, &PartitionPruner::empty()).await
- }
-
- async fn list_partition_paths(
- storage: &Storage,
- partition_pruner: &PartitionPruner,
- ) -> Result<Vec<String>> {
- let top_level_dirs: Vec<String> = storage
- .list_dirs(None)
- .await?
- .into_iter()
- .filter(|dir| dir != ".hoodie")
- .collect();
- let mut partition_paths = Vec::new();
- for dir in top_level_dirs {
- partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?);
- }
- if partition_paths.is_empty() {
- partition_paths.push("".to_string())
- }
- if partition_pruner.is_empty() {
- return Ok(partition_paths);
- }
-
- Ok(partition_paths
- .into_iter()
- .filter(|path_str| partition_pruner.should_include(path_str))
- .collect())
- }
-
- async fn list_file_groups_for_partition(
- storage: &Storage,
- partition_path: &str,
- base_file_format: &str,
- ) -> Result<Vec<FileGroup>> {
- let listed_file_metadata =
storage.list_files(Some(partition_path)).await?;
-
- let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
- let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> =
HashMap::new();
-
- for file_metadata in listed_file_metadata {
- if Self::should_exclude_for_listing(&file_metadata.name) {
- continue;
- }
-
- let base_file_extension = format!(".{}", base_file_format);
- if file_metadata.name.ends_with(&base_file_extension) {
- // After excluding the unintended files,
- // we expect a file that has the base file extension to be a
valid base file.
- let base_file = BaseFile::try_from(file_metadata)?;
- let file_id = &base_file.file_id;
- file_id_to_base_files
- .entry(file_id.to_owned())
- .or_default()
- .push(base_file);
- } else {
- match LogFile::try_from(file_metadata) {
- Ok(log_file) => {
- let file_id = &log_file.file_id;
- file_id_to_log_files
- .entry(file_id.to_owned())
- .or_default()
- .push(log_file);
- }
- Err(e) => {
- // We don't support cdc log files yet, hence skipping
error when parsing
- // fails. However, once we support all data files, we
should return error
- // here because we expect all files to be either base
files or log files,
- // after excluding the unintended files.
- log::warn!("Failed to create a log file: {}", e);
- continue;
- }
- }
- }
- }
-
- let mut file_groups: Vec<FileGroup> = Vec::new();
- // TODO support creating file groups without base files
- for (file_id, base_files) in file_id_to_base_files.into_iter() {
- let mut file_group =
- FileGroup::new(file_id.to_owned(),
Some(partition_path.to_owned()));
-
- file_group.add_base_files(base_files)?;
-
- let log_files =
file_id_to_log_files.remove(&file_id).unwrap_or_default();
- file_group.add_log_files(log_files)?;
-
- file_groups.push(file_group);
- }
- Ok(file_groups)
- }
-
async fn load_file_groups(&self, partition_pruner: &PartitionPruner) ->
Result<()> {
- let all_partition_paths =
Self::list_all_partition_paths(&self.storage).await?;
-
- let partition_paths_to_list = all_partition_paths
- .into_iter()
- .filter(|p| !self.partition_to_file_groups.contains_key(p))
- .filter(|p| partition_pruner.should_include(p))
- .collect::<HashSet<_>>();
-
- let base_file_format = self
- .hudi_configs
- .get_or_default(BaseFileFormat)
- .to::<String>();
- let parallelism = self
- .hudi_configs
- .get_or_default(ListingParallelism)
- .to::<usize>();
- stream::iter(partition_paths_to_list)
- .map(|path| {
- let base_file_format = base_file_format.clone();
- async move {
- let format = base_file_format.as_str();
- let file_groups =
- Self::list_file_groups_for_partition(&self.storage,
&path, format).await?;
- Ok::<_, CoreError>((path, file_groups))
- }
- })
- .buffer_unordered(parallelism)
- .try_for_each(|(path, file_groups)| async move {
- self.partition_to_file_groups.insert(path, file_groups);
- Ok(())
- })
- .await
+ let lister = FileLister::new(
+ self.hudi_configs.clone(),
+ self.storage.clone(),
+ partition_pruner.to_owned(),
+ );
+ let file_groups_map =
lister.list_file_groups_for_relevant_partitions().await?;
+ for (partition_path, file_groups) in file_groups_map {
+ self.partition_to_file_groups
+ .insert(partition_path, file_groups);
+ }
+ Ok(())
}
async fn collect_file_slices_as_of(
@@ -230,65 +107,18 @@ impl FileSystemView {
#[cfg(test)]
mod tests {
- use crate::config::table::HudiTableConfig;
- use crate::config::HudiConfigs;
+ use super::*;
use crate::expr::filter::Filter;
- use crate::storage::Storage;
- use crate::table::fs_view::FileSystemView;
- use crate::table::partition::PartitionPruner;
use crate::table::Table;
use hudi_tests::SampleTable;
- use std::collections::{HashMap, HashSet};
- use std::sync::Arc;
- use url::Url;
-
- async fn create_test_fs_view(base_url: Url) -> FileSystemView {
- FileSystemView::new(
- Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath,
base_url)])),
- Arc::new(HashMap::new()),
- )
- .await
- .unwrap()
- }
-
- #[tokio::test]
- async fn get_partition_paths_for_nonpartitioned_table() {
- let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let storage = Storage::new_with_base_url(base_url).unwrap();
- let partition_pruner = PartitionPruner::empty();
- let partition_paths = FileSystemView::list_partition_paths(&storage,
&partition_pruner)
- .await
- .unwrap();
- let partition_path_set: HashSet<&str> =
- HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
- assert_eq!(partition_path_set, HashSet::from([""]))
- }
-
- #[tokio::test]
- async fn get_partition_paths_for_complexkeygen_table() {
- let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
- let storage = Storage::new_with_base_url(base_url).unwrap();
- let partition_pruner = PartitionPruner::empty();
- let partition_paths = FileSystemView::list_partition_paths(&storage,
&partition_pruner)
- .await
- .unwrap();
- let partition_path_set: HashSet<&str> =
- HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
- assert_eq!(
- partition_path_set,
- HashSet::from_iter(vec![
- "byteField=10/shortField=300",
- "byteField=20/shortField=100",
- "byteField=30/shortField=100"
- ])
- )
- }
+ use std::collections::HashSet;
#[tokio::test]
async fn fs_view_get_latest_file_slices() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let fs_view = create_test_fs_view(base_url).await;
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let fs_view = &hudi_table.file_system_view;
assert!(fs_view.partition_to_file_groups.is_empty());
let partition_pruner = PartitionPruner::empty();
@@ -313,7 +143,7 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_replace_commit() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let fs_view = create_test_fs_view(base_url).await;
+ let fs_view = &hudi_table.file_system_view;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
let partition_pruner = PartitionPruner::empty();
@@ -342,7 +172,7 @@ mod tests {
async fn fs_view_get_latest_file_slices_with_partition_filters() {
let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
let hudi_table = Table::new(base_url.path()).await.unwrap();
- let fs_view = create_test_fs_view(base_url).await;
+ let fs_view = &hudi_table.file_system_view;
assert_eq!(fs_view.partition_to_file_groups.len(), 0);
diff --git a/crates/core/src/table/listing.rs b/crates/core/src/table/listing.rs
new file mode 100644
index 0000000..f0c0ca8
--- /dev/null
+++ b/crates/core/src/table/listing.rs
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::read::HudiReadConfig::ListingParallelism;
+use crate::config::table::HudiTableConfig::BaseFileFormat;
+use crate::config::HudiConfigs;
+use crate::error::CoreError;
+use crate::file_group::base_file::BaseFile;
+use crate::file_group::log_file::LogFile;
+use crate::file_group::FileGroup;
+use crate::metadata::LAKE_FORMAT_METADATA_DIRS;
+use crate::storage::{get_leaf_dirs, Storage};
+use crate::table::partition::{
+ is_table_partitioned, PartitionPruner, EMPTY_PARTITION_PATH,
PARTITION_METAFIELD_PREFIX,
+};
+use crate::Result;
+use dashmap::DashMap;
+use futures::{stream, StreamExt, TryStreamExt};
+use std::collections::HashMap;
+use std::string::ToString;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+#[allow(dead_code)]
+pub struct FileLister {
+ hudi_configs: Arc<HudiConfigs>,
+ storage: Arc<Storage>,
+ partition_pruner: PartitionPruner,
+}
+
+impl FileLister {
+ pub fn new(
+ hudi_configs: Arc<HudiConfigs>,
+ storage: Arc<Storage>,
+ partition_pruner: PartitionPruner,
+ ) -> Self {
+ Self {
+ hudi_configs,
+ storage,
+ partition_pruner,
+ }
+ }
+
+ fn should_exclude_for_listing(file_name: &str) -> bool {
+ file_name.starts_with(PARTITION_METAFIELD_PREFIX) ||
file_name.ends_with(".crc")
+ }
+
+ async fn list_file_groups_for_partition(&self, partition_path: &str) ->
Result<Vec<FileGroup>> {
+ let base_file_format = self
+ .hudi_configs
+ .get_or_default(BaseFileFormat)
+ .to::<String>();
+
+ let listed_file_metadata =
self.storage.list_files(Some(partition_path)).await?;
+
+ let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
+ let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> =
HashMap::new();
+
+ for file_metadata in listed_file_metadata {
+ if FileLister::should_exclude_for_listing(&file_metadata.name) {
+ continue;
+ }
+
+ let base_file_extension = format!(".{}", base_file_format);
+ if file_metadata.name.ends_with(&base_file_extension) {
+ // After excluding the unintended files,
+ // we expect a file that has the base file extension to be a
valid base file.
+ let base_file = BaseFile::try_from(file_metadata)?;
+ let file_id = &base_file.file_id;
+ file_id_to_base_files
+ .entry(file_id.to_owned())
+ .or_default()
+ .push(base_file);
+ } else {
+ match LogFile::try_from(file_metadata) {
+ Ok(log_file) => {
+ let file_id = &log_file.file_id;
+ file_id_to_log_files
+ .entry(file_id.to_owned())
+ .or_default()
+ .push(log_file);
+ }
+ Err(e) => {
+ // We don't support cdc log files yet, hence skipping
error when parsing
+ // fails. However, once we support all data files, we
should return error
+ // here because we expect all files to be either base
files or log files,
+ // after excluding the unintended files.
+ log::warn!("Failed to create a log file: {}", e);
+ continue;
+ }
+ }
+ }
+ }
+
+ let mut file_groups: Vec<FileGroup> = Vec::new();
+ // TODO support creating file groups without base files
+ for (file_id, base_files) in file_id_to_base_files.into_iter() {
+ let mut file_group = FileGroup::new(file_id.to_owned(),
partition_path.to_string());
+
+ file_group.add_base_files(base_files)?;
+
+ let log_files =
file_id_to_log_files.remove(&file_id).unwrap_or_default();
+ file_group.add_log_files(log_files)?;
+
+ file_groups.push(file_group);
+ }
+ Ok(file_groups)
+ }
+
+ async fn list_relevant_partition_paths(&self) -> Result<Vec<String>> {
+ if !is_table_partitioned(&self.hudi_configs) {
+ return Ok(vec![EMPTY_PARTITION_PATH.to_string()]);
+ }
+
+ let top_level_dirs: Vec<String> = self
+ .storage
+ .list_dirs(None)
+ .await?
+ .into_iter()
+ .filter(|dir| !LAKE_FORMAT_METADATA_DIRS.contains(&dir.as_str()))
+ .collect();
+
+ let mut partition_paths = Vec::new();
+ for dir in top_level_dirs {
+ partition_paths.extend(get_leaf_dirs(&self.storage,
Some(&dir)).await?);
+ }
+
+ if partition_paths.is_empty() || self.partition_pruner.is_empty() {
+ return Ok(partition_paths);
+ }
+
+ Ok(partition_paths
+ .into_iter()
+ .filter(|path_str| self.partition_pruner.should_include(path_str))
+ .collect())
+ }
+
+ pub async fn list_file_groups_for_relevant_partitions(
+ &self,
+ ) -> Result<DashMap<String, Vec<FileGroup>>> {
+ if !is_table_partitioned(&self.hudi_configs) {
+ let file_groups = self
+ .list_file_groups_for_partition(EMPTY_PARTITION_PATH)
+ .await?;
+ let file_groups_map = DashMap::with_capacity(1);
+ file_groups_map.insert(EMPTY_PARTITION_PATH.to_string(),
file_groups);
+ return Ok(file_groups_map);
+ }
+
+ let pruned_partition_paths =
self.list_relevant_partition_paths().await?;
+ let file_groups_map =
Arc::new(DashMap::with_capacity(pruned_partition_paths.len()));
+ let parallelism = self
+ .hudi_configs
+ .get_or_default(ListingParallelism)
+ .to::<usize>();
+ stream::iter(pruned_partition_paths)
+ .map(|p| async move {
+ let file_groups =
self.list_file_groups_for_partition(&p).await?;
+ Ok::<_, CoreError>((p, file_groups))
+ })
+ .buffer_unordered(parallelism)
+ .try_for_each(|(p, file_groups)| {
+ let file_groups_map = file_groups_map.clone();
+ async move {
+ file_groups_map.insert(p, file_groups);
+ Ok(())
+ }
+ })
+ .await?;
+
+ Ok(file_groups_map.as_ref().to_owned())
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use crate::table::Table;
+ use hudi_tests::SampleTable;
+ use std::collections::HashSet;
+
+ #[tokio::test]
+ async fn list_partition_paths_for_nonpartitioned_table() {
+ let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let lister = FileLister::new(
+ hudi_table.hudi_configs.clone(),
+ hudi_table.file_system_view.storage.clone(),
+ PartitionPruner::empty(),
+ );
+ let partition_paths =
lister.list_relevant_partition_paths().await.unwrap();
+ let partition_path_set: HashSet<&str> =
+ HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
+ assert_eq!(partition_path_set, HashSet::from([""]))
+ }
+
+ #[tokio::test]
+ async fn list_partition_paths_for_complexkeygen_table() {
+ let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let fs_view = &hudi_table.file_system_view;
+ let lister = FileLister::new(
+ fs_view.hudi_configs.clone(),
+ fs_view.storage.clone(),
+ PartitionPruner::empty(),
+ );
+ let partition_paths =
lister.list_relevant_partition_paths().await.unwrap();
+ let partition_path_set: HashSet<&str> =
+ HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
+ assert_eq!(
+ partition_path_set,
+ HashSet::from_iter(vec![
+ "byteField=10/shortField=300",
+ "byteField=20/shortField=100",
+ "byteField=30/shortField=100"
+ ])
+ )
+ }
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0fdd46b..c1db54d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -87,6 +87,7 @@
pub mod builder;
mod fs_view;
+mod listing;
pub mod partition;
use crate::config::read::HudiReadConfig::AsOfTimestamp;
diff --git a/crates/core/src/table/partition.rs
b/crates/core/src/table/partition.rs
index be541c2..ace4a49 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -25,10 +25,28 @@ use crate::Result;
use arrow_array::{ArrayRef, Scalar};
use arrow_schema::Schema;
+use crate::config::table::HudiTableConfig::{KeyGeneratorClass,
PartitionFields};
use std::collections::HashMap;
use std::sync::Arc;
pub const PARTITION_METAFIELD_PREFIX: &str = ".hoodie_partition_metadata";
+pub const EMPTY_PARTITION_PATH: &str = "";
+
+pub fn is_table_partitioned(hudi_configs: &HudiConfigs) -> bool {
+ let has_partition_fields = !hudi_configs
+ .get_or_default(PartitionFields)
+ .to::<Vec<String>>()
+ .is_empty();
+
+ let uses_non_partitioned_key_gen = hudi_configs
+ .try_get(KeyGeneratorClass)
+ .map(|key_gen| {
+ key_gen.to::<String>() ==
"org.apache.hudi.keygen.NonpartitionedKeyGenerator"
+ })
+ .unwrap_or(false);
+
+ has_partition_fields && !uses_non_partitioned_key_gen
+}
/// A partition pruner that filters partitions based on the partition path and
its filters.
#[derive(Debug, Clone)]
diff --git a/python/src/internal.rs b/python/src/internal.rs
index b2a667b..9d300fd 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -126,7 +126,7 @@ impl HudiFileSlice {
#[cfg(not(tarpaulin))]
fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
let file_id = f.file_id().to_string();
- let partition_path = f.partition_path().to_string();
+ let partition_path = f.partition_path.to_string();
let creation_instant_time = f.creation_instant_time().to_string();
let base_file_name = f.base_file.file_name();
let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();