This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4575ccd  refactor: improve file system view's listing flow (#251)
4575ccd is described below

commit 4575ccd534c1d8e179773e61c8d785e314e37c2d
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jan 19 21:11:19 2025 -0600

    refactor: improve file system view's listing flow (#251)
    
    - Move file listing logic from `FileSystemView` to `FileLister`
    - Change FileGroup and FileSlice partition_path to accept empty string for 
non-partitioned tables
    - File system view will refresh relevant partitions for loading the latest 
file groups when a query needs to scan those
    - Skip Iceberg's and Delta's metadata dirs when listing top level dirs
---
 crates/core/src/file_group/builder.rs    |  23 +--
 crates/core/src/file_group/file_slice.rs |  12 +-
 crates/core/src/file_group/mod.rs        |  29 ++--
 crates/core/src/metadata/mod.rs          |  10 ++
 crates/core/src/table/fs_view.rs         | 212 +++-------------------------
 crates/core/src/table/listing.rs         | 234 +++++++++++++++++++++++++++++++
 crates/core/src/table/mod.rs             |   1 +
 crates/core/src/table/partition.rs       |  18 +++
 python/src/internal.rs                   |   2 +-
 9 files changed, 313 insertions(+), 228 deletions(-)

diff --git a/crates/core/src/file_group/builder.rs 
b/crates/core/src/file_group/builder.rs
index 680b55d..4f73aa1 100644
--- a/crates/core/src/file_group/builder.rs
+++ b/crates/core/src/file_group/builder.rs
@@ -38,8 +38,6 @@ pub fn build_file_groups(commit_metadata: &Map<String, 
Value>) -> Result<HashSet
             .as_array()
             .ok_or_else(|| CoreError::CommitMetadata("Invalid write stats 
array".into()))?;
 
-        let partition = (!partition.is_empty()).then(|| partition.to_string());
-
         for stat in write_stats {
             let file_id = stat
                 .get("fileId")
@@ -85,8 +83,6 @@ pub fn build_replaced_file_groups(
             .as_array()
             .ok_or_else(|| CoreError::CommitMetadata("Invalid file group ids 
array".into()))?;
 
-        let partition = (!partition.is_empty()).then(|| partition.to_string());
-
         for file_id in file_ids {
             let id = file_id
                 .as_str()
@@ -269,17 +265,15 @@ mod tests {
                 "byteField=20/shortField=100",
                 "byteField=10/shortField=300",
             ]);
-            let actual_partitions = HashSet::<&str>::from_iter(
-                file_groups
-                    .iter()
-                    .map(|fg| fg.partition_path.as_ref().unwrap().as_str()),
-            );
+            let actual_partitions =
+                HashSet::<&str>::from_iter(file_groups.iter().map(|fg| 
fg.partition_path.as_str()));
             assert_eq!(actual_partitions, expected_partitions);
         }
     }
 
     mod test_build_replaced_file_groups {
         use super::super::*;
+        use crate::table::partition::EMPTY_PARTITION_PATH;
         use serde_json::{json, Map, Value};
 
         #[test]
@@ -369,7 +363,7 @@ mod tests {
             let file_groups = result.unwrap();
             assert_eq!(file_groups.len(), 1);
             let file_group = file_groups.iter().next().unwrap();
-            assert!(file_group.partition_path.is_none());
+            assert_eq!(file_group.partition_path, EMPTY_PARTITION_PATH);
         }
 
         #[test]
@@ -391,7 +385,7 @@ mod tests {
             let file_groups = result.unwrap();
             let actual_partition_paths = file_groups
                 .iter()
-                .map(|fg| fg.partition_path.as_ref().unwrap().as_str())
+                .map(|fg| fg.partition_path.as_str())
                 .collect::<Vec<_>>();
             assert_eq!(actual_partition_paths, &["20", "20"]);
         }
@@ -432,11 +426,8 @@ mod tests {
             assert_eq!(file_groups.len(), 3);
 
             let expected_partitions = HashSet::from_iter(vec!["10", "20", 
"30"]);
-            let actual_partitions = HashSet::<&str>::from_iter(
-                file_groups
-                    .iter()
-                    .map(|fg| fg.partition_path.as_ref().unwrap().as_str()),
-            );
+            let actual_partitions =
+                HashSet::<&str>::from_iter(file_groups.iter().map(|fg| 
fg.partition_path.as_str()));
             assert_eq!(actual_partitions, expected_partitions);
         }
     }
diff --git a/crates/core/src/file_group/file_slice.rs 
b/crates/core/src/file_group/file_slice.rs
index c8ca359..b9df522 100644
--- a/crates/core/src/file_group/file_slice.rs
+++ b/crates/core/src/file_group/file_slice.rs
@@ -30,11 +30,11 @@ use std::path::PathBuf;
 pub struct FileSlice {
     pub base_file: BaseFile,
     pub log_files: BTreeSet<LogFile>,
-    pub partition_path: Option<String>,
+    pub partition_path: String,
 }
 
 impl FileSlice {
-    pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
+    pub fn new(base_file: BaseFile, partition_path: String) -> Self {
         Self {
             base_file,
             log_files: BTreeSet::new(),
@@ -43,7 +43,7 @@ impl FileSlice {
     }
 
     fn relative_path_for_file(&self, file_name: &str) -> Result<String> {
-        let path = PathBuf::from(self.partition_path()).join(file_name);
+        let path = PathBuf::from(self.partition_path.as_str()).join(file_name);
         path.to_str().map(|s| s.to_string()).ok_or_else(|| {
             CoreError::FileGroup(format!("Failed to get relative path for 
file: {file_name}",))
         })
@@ -67,12 +67,6 @@ impl FileSlice {
         &self.base_file.file_id
     }
 
-    /// Returns the partition path of the [FileSlice].
-    #[inline]
-    pub fn partition_path(&self) -> &str {
-        self.partition_path.as_deref().unwrap_or_default()
-    }
-
     /// Returns the instant time that marks the [FileSlice] creation.
     ///
     /// This is also an instant time stored in the [Timeline].
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 7d3bcc7..54c83fc 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -41,7 +41,7 @@ use std::str::FromStr;
 #[derive(Clone, Debug)]
 pub struct FileGroup {
     pub file_id: String,
-    pub partition_path: Option<String>,
+    pub partition_path: String,
     pub file_slices: BTreeMap<String, FileSlice>,
 }
 
@@ -64,7 +64,7 @@ impl fmt::Display for FileGroup {
     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         f.write_str(
             format!(
-                "File Group: partition {:?} id {}",
+                "File Group: partition={}, id={}",
                 &self.partition_path, &self.file_id
             )
             .as_str(),
@@ -73,7 +73,7 @@ impl fmt::Display for FileGroup {
 }
 
 impl FileGroup {
-    pub fn new(file_id: String, partition_path: Option<String>) -> Self {
+    pub fn new(file_id: String, partition_path: String) -> Self {
         Self {
             file_id,
             partition_path,
@@ -83,7 +83,7 @@ impl FileGroup {
 
     pub fn new_with_base_file_name(
         id: String,
-        partition_path: Option<String>,
+        partition_path: String,
         file_name: &str,
     ) -> Result<Self> {
         let mut file_group = Self::new(id, partition_path);
@@ -175,10 +175,14 @@ impl FileGroup {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::table::partition::EMPTY_PARTITION_PATH;
 
     #[test]
     fn load_a_valid_file_group() {
-        let mut fg = 
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+        let mut fg = FileGroup::new(
+            "5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
+            EMPTY_PARTITION_PATH.to_string(),
+        );
         let _ = fg.add_base_file_from_name(
             
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
         );
@@ -186,7 +190,7 @@ mod tests {
             
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
         );
         assert_eq!(fg.file_slices.len(), 2);
-        assert!(fg.partition_path.is_none());
+        assert_eq!(fg.partition_path, EMPTY_PARTITION_PATH);
         let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| 
k.as_str()).collect();
         assert_eq!(commit_times, vec!["20240402123035233", 
"20240402144910683"]);
         assert_eq!(
@@ -201,7 +205,10 @@ mod tests {
 
     #[test]
     fn add_base_file_with_same_commit_time_should_fail() {
-        let mut fg = 
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+        let mut fg = FileGroup::new(
+            "5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(),
+            EMPTY_PARTITION_PATH.to_string(),
+        );
         let res1 = fg.add_base_file_from_name(
             
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
         );
@@ -217,7 +224,7 @@ mod tests {
     fn test_file_group_display() {
         let file_group = FileGroup {
             file_id: "group123".to_string(),
-            partition_path: Some("part/2023-01-01".to_string()),
+            partition_path: "part/2023-01-01".to_string(),
             file_slices: BTreeMap::new(),
         };
 
@@ -225,12 +232,12 @@ mod tests {
 
         assert_eq!(
             display_string,
-            "File Group: partition Some(\"part/2023-01-01\") id group123"
+            "File Group: partition=part/2023-01-01, id=group123"
         );
 
         let file_group_no_partition = FileGroup {
             file_id: "group456".to_string(),
-            partition_path: None,
+            partition_path: EMPTY_PARTITION_PATH.to_string(),
             file_slices: BTreeMap::new(),
         };
 
@@ -238,7 +245,7 @@ mod tests {
 
         assert_eq!(
             display_string_no_partition,
-            "File Group: partition None id group456"
+            "File Group: partition=, id=group456"
         );
     }
 }
diff --git a/crates/core/src/metadata/mod.rs b/crates/core/src/metadata/mod.rs
index 48133fd..e7ae3b1 100644
--- a/crates/core/src/metadata/mod.rs
+++ b/crates/core/src/metadata/mod.rs
@@ -17,3 +17,13 @@
  * under the License.
  */
 pub mod meta_field;
+
+pub const HUDI_METADATA_DIR: &str = ".hoodie";
+pub const DELTALAKE_METADATA_DIR: &str = "_delta_log";
+pub const ICEBERG_METADATA_DIR: &str = "metadata";
+
+pub const LAKE_FORMAT_METADATA_DIRS: &[&str; 3] = &[
+    HUDI_METADATA_DIR,
+    DELTALAKE_METADATA_DIR,
+    ICEBERG_METADATA_DIR,
+];
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 0abbe08..e5fa7e3 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -21,26 +21,21 @@ use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
 use crate::config::HudiConfigs;
-use crate::file_group::base_file::BaseFile;
 use crate::file_group::FileGroup;
-use crate::storage::{get_leaf_dirs, Storage};
+use crate::storage::Storage;
 
-use crate::config::read::HudiReadConfig::ListingParallelism;
-use crate::config::table::HudiTableConfig::BaseFileFormat;
-use crate::error::CoreError;
 use crate::file_group::file_slice::FileSlice;
-use crate::file_group::log_file::LogFile;
-use crate::table::partition::{PartitionPruner, PARTITION_METAFIELD_PREFIX};
+use crate::table::listing::FileLister;
+use crate::table::partition::PartitionPruner;
 use crate::Result;
 use dashmap::DashMap;
-use futures::stream::{self, StreamExt, TryStreamExt};
 
 /// A view of the Hudi table's data files (files stored outside the `.hoodie/` 
directory) in the file system. It provides APIs to load and
 /// access the file groups and file slices.
 #[derive(Clone, Debug)]
 #[allow(dead_code)]
 pub struct FileSystemView {
-    hudi_configs: Arc<HudiConfigs>,
+    pub(crate) hudi_configs: Arc<HudiConfigs>,
     pub(crate) storage: Arc<Storage>,
     partition_to_file_groups: Arc<DashMap<String, Vec<FileGroup>>>,
 }
@@ -59,136 +54,18 @@ impl FileSystemView {
         })
     }
 
-    fn should_exclude_for_listing(file_name: &str) -> bool {
-        file_name.starts_with(PARTITION_METAFIELD_PREFIX) || 
file_name.ends_with(".crc")
-    }
-
-    async fn list_all_partition_paths(storage: &Storage) -> 
Result<Vec<String>> {
-        Self::list_partition_paths(storage, &PartitionPruner::empty()).await
-    }
-
-    async fn list_partition_paths(
-        storage: &Storage,
-        partition_pruner: &PartitionPruner,
-    ) -> Result<Vec<String>> {
-        let top_level_dirs: Vec<String> = storage
-            .list_dirs(None)
-            .await?
-            .into_iter()
-            .filter(|dir| dir != ".hoodie")
-            .collect();
-        let mut partition_paths = Vec::new();
-        for dir in top_level_dirs {
-            partition_paths.extend(get_leaf_dirs(storage, Some(&dir)).await?);
-        }
-        if partition_paths.is_empty() {
-            partition_paths.push("".to_string())
-        }
-        if partition_pruner.is_empty() {
-            return Ok(partition_paths);
-        }
-
-        Ok(partition_paths
-            .into_iter()
-            .filter(|path_str| partition_pruner.should_include(path_str))
-            .collect())
-    }
-
-    async fn list_file_groups_for_partition(
-        storage: &Storage,
-        partition_path: &str,
-        base_file_format: &str,
-    ) -> Result<Vec<FileGroup>> {
-        let listed_file_metadata = 
storage.list_files(Some(partition_path)).await?;
-
-        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
-        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
-
-        for file_metadata in listed_file_metadata {
-            if Self::should_exclude_for_listing(&file_metadata.name) {
-                continue;
-            }
-
-            let base_file_extension = format!(".{}", base_file_format);
-            if file_metadata.name.ends_with(&base_file_extension) {
-                // After excluding the unintended files,
-                // we expect a file that has the base file extension to be a 
valid base file.
-                let base_file = BaseFile::try_from(file_metadata)?;
-                let file_id = &base_file.file_id;
-                file_id_to_base_files
-                    .entry(file_id.to_owned())
-                    .or_default()
-                    .push(base_file);
-            } else {
-                match LogFile::try_from(file_metadata) {
-                    Ok(log_file) => {
-                        let file_id = &log_file.file_id;
-                        file_id_to_log_files
-                            .entry(file_id.to_owned())
-                            .or_default()
-                            .push(log_file);
-                    }
-                    Err(e) => {
-                        // We don't support cdc log files yet, hence skipping 
error when parsing
-                        // fails. However, once we support all data files, we 
should return error
-                        // here because we expect all files to be either base 
files or log files,
-                        // after excluding the unintended files.
-                        log::warn!("Failed to create a log file: {}", e);
-                        continue;
-                    }
-                }
-            }
-        }
-
-        let mut file_groups: Vec<FileGroup> = Vec::new();
-        // TODO support creating file groups without base files
-        for (file_id, base_files) in file_id_to_base_files.into_iter() {
-            let mut file_group =
-                FileGroup::new(file_id.to_owned(), 
Some(partition_path.to_owned()));
-
-            file_group.add_base_files(base_files)?;
-
-            let log_files = 
file_id_to_log_files.remove(&file_id).unwrap_or_default();
-            file_group.add_log_files(log_files)?;
-
-            file_groups.push(file_group);
-        }
-        Ok(file_groups)
-    }
-
     async fn load_file_groups(&self, partition_pruner: &PartitionPruner) -> 
Result<()> {
-        let all_partition_paths = 
Self::list_all_partition_paths(&self.storage).await?;
-
-        let partition_paths_to_list = all_partition_paths
-            .into_iter()
-            .filter(|p| !self.partition_to_file_groups.contains_key(p))
-            .filter(|p| partition_pruner.should_include(p))
-            .collect::<HashSet<_>>();
-
-        let base_file_format = self
-            .hudi_configs
-            .get_or_default(BaseFileFormat)
-            .to::<String>();
-        let parallelism = self
-            .hudi_configs
-            .get_or_default(ListingParallelism)
-            .to::<usize>();
-        stream::iter(partition_paths_to_list)
-            .map(|path| {
-                let base_file_format = base_file_format.clone();
-                async move {
-                    let format = base_file_format.as_str();
-                    let file_groups =
-                        Self::list_file_groups_for_partition(&self.storage, 
&path, format).await?;
-                    Ok::<_, CoreError>((path, file_groups))
-                }
-            })
-            .buffer_unordered(parallelism)
-            .try_for_each(|(path, file_groups)| async move {
-                self.partition_to_file_groups.insert(path, file_groups);
-                Ok(())
-            })
-            .await
+        let lister = FileLister::new(
+            self.hudi_configs.clone(),
+            self.storage.clone(),
+            partition_pruner.to_owned(),
+        );
+        let file_groups_map = 
lister.list_file_groups_for_relevant_partitions().await?;
+        for (partition_path, file_groups) in file_groups_map {
+            self.partition_to_file_groups
+                .insert(partition_path, file_groups);
+        }
+        Ok(())
     }
 
     async fn collect_file_slices_as_of(
@@ -230,65 +107,18 @@ impl FileSystemView {
 
 #[cfg(test)]
 mod tests {
-    use crate::config::table::HudiTableConfig;
-    use crate::config::HudiConfigs;
+    use super::*;
     use crate::expr::filter::Filter;
-    use crate::storage::Storage;
-    use crate::table::fs_view::FileSystemView;
-    use crate::table::partition::PartitionPruner;
     use crate::table::Table;
 
     use hudi_tests::SampleTable;
-    use std::collections::{HashMap, HashSet};
-    use std::sync::Arc;
-    use url::Url;
-
-    async fn create_test_fs_view(base_url: Url) -> FileSystemView {
-        FileSystemView::new(
-            Arc::new(HudiConfigs::new([(HudiTableConfig::BasePath, 
base_url)])),
-            Arc::new(HashMap::new()),
-        )
-        .await
-        .unwrap()
-    }
-
-    #[tokio::test]
-    async fn get_partition_paths_for_nonpartitioned_table() {
-        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let storage = Storage::new_with_base_url(base_url).unwrap();
-        let partition_pruner = PartitionPruner::empty();
-        let partition_paths = FileSystemView::list_partition_paths(&storage, 
&partition_pruner)
-            .await
-            .unwrap();
-        let partition_path_set: HashSet<&str> =
-            HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
-        assert_eq!(partition_path_set, HashSet::from([""]))
-    }
-
-    #[tokio::test]
-    async fn get_partition_paths_for_complexkeygen_table() {
-        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
-        let storage = Storage::new_with_base_url(base_url).unwrap();
-        let partition_pruner = PartitionPruner::empty();
-        let partition_paths = FileSystemView::list_partition_paths(&storage, 
&partition_pruner)
-            .await
-            .unwrap();
-        let partition_path_set: HashSet<&str> =
-            HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
-        assert_eq!(
-            partition_path_set,
-            HashSet::from_iter(vec![
-                "byteField=10/shortField=300",
-                "byteField=20/shortField=100",
-                "byteField=30/shortField=100"
-            ])
-        )
-    }
+    use std::collections::HashSet;
 
     #[tokio::test]
     async fn fs_view_get_latest_file_slices() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let fs_view = create_test_fs_view(base_url).await;
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let fs_view = &hudi_table.file_system_view;
 
         assert!(fs_view.partition_to_file_groups.is_empty());
         let partition_pruner = PartitionPruner::empty();
@@ -313,7 +143,7 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_replace_commit() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let fs_view = create_test_fs_view(base_url).await;
+        let fs_view = &hudi_table.file_system_view;
 
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
         let partition_pruner = PartitionPruner::empty();
@@ -342,7 +172,7 @@ mod tests {
     async fn fs_view_get_latest_file_slices_with_partition_filters() {
         let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let fs_view = create_test_fs_view(base_url).await;
+        let fs_view = &hudi_table.file_system_view;
 
         assert_eq!(fs_view.partition_to_file_groups.len(), 0);
 
diff --git a/crates/core/src/table/listing.rs b/crates/core/src/table/listing.rs
new file mode 100644
index 0000000..f0c0ca8
--- /dev/null
+++ b/crates/core/src/table/listing.rs
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::read::HudiReadConfig::ListingParallelism;
+use crate::config::table::HudiTableConfig::BaseFileFormat;
+use crate::config::HudiConfigs;
+use crate::error::CoreError;
+use crate::file_group::base_file::BaseFile;
+use crate::file_group::log_file::LogFile;
+use crate::file_group::FileGroup;
+use crate::metadata::LAKE_FORMAT_METADATA_DIRS;
+use crate::storage::{get_leaf_dirs, Storage};
+use crate::table::partition::{
+    is_table_partitioned, PartitionPruner, EMPTY_PARTITION_PATH, 
PARTITION_METAFIELD_PREFIX,
+};
+use crate::Result;
+use dashmap::DashMap;
+use futures::{stream, StreamExt, TryStreamExt};
+use std::collections::HashMap;
+use std::string::ToString;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+#[allow(dead_code)]
+pub struct FileLister {
+    hudi_configs: Arc<HudiConfigs>,
+    storage: Arc<Storage>,
+    partition_pruner: PartitionPruner,
+}
+
+impl FileLister {
+    pub fn new(
+        hudi_configs: Arc<HudiConfigs>,
+        storage: Arc<Storage>,
+        partition_pruner: PartitionPruner,
+    ) -> Self {
+        Self {
+            hudi_configs,
+            storage,
+            partition_pruner,
+        }
+    }
+
+    fn should_exclude_for_listing(file_name: &str) -> bool {
+        file_name.starts_with(PARTITION_METAFIELD_PREFIX) || 
file_name.ends_with(".crc")
+    }
+
+    async fn list_file_groups_for_partition(&self, partition_path: &str) -> 
Result<Vec<FileGroup>> {
+        let base_file_format = self
+            .hudi_configs
+            .get_or_default(BaseFileFormat)
+            .to::<String>();
+
+        let listed_file_metadata = 
self.storage.list_files(Some(partition_path)).await?;
+
+        let mut file_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        let mut file_id_to_log_files: HashMap<String, Vec<LogFile>> = 
HashMap::new();
+
+        for file_metadata in listed_file_metadata {
+            if FileLister::should_exclude_for_listing(&file_metadata.name) {
+                continue;
+            }
+
+            let base_file_extension = format!(".{}", base_file_format);
+            if file_metadata.name.ends_with(&base_file_extension) {
+                // After excluding the unintended files,
+                // we expect a file that has the base file extension to be a 
valid base file.
+                let base_file = BaseFile::try_from(file_metadata)?;
+                let file_id = &base_file.file_id;
+                file_id_to_base_files
+                    .entry(file_id.to_owned())
+                    .or_default()
+                    .push(base_file);
+            } else {
+                match LogFile::try_from(file_metadata) {
+                    Ok(log_file) => {
+                        let file_id = &log_file.file_id;
+                        file_id_to_log_files
+                            .entry(file_id.to_owned())
+                            .or_default()
+                            .push(log_file);
+                    }
+                    Err(e) => {
+                        // We don't support cdc log files yet, hence skipping 
error when parsing
+                        // fails. However, once we support all data files, we 
should return error
+                        // here because we expect all files to be either base 
files or log files,
+                        // after excluding the unintended files.
+                        log::warn!("Failed to create a log file: {}", e);
+                        continue;
+                    }
+                }
+            }
+        }
+
+        let mut file_groups: Vec<FileGroup> = Vec::new();
+        // TODO support creating file groups without base files
+        for (file_id, base_files) in file_id_to_base_files.into_iter() {
+            let mut file_group = FileGroup::new(file_id.to_owned(), 
partition_path.to_string());
+
+            file_group.add_base_files(base_files)?;
+
+            let log_files = 
file_id_to_log_files.remove(&file_id).unwrap_or_default();
+            file_group.add_log_files(log_files)?;
+
+            file_groups.push(file_group);
+        }
+        Ok(file_groups)
+    }
+
+    async fn list_relevant_partition_paths(&self) -> Result<Vec<String>> {
+        if !is_table_partitioned(&self.hudi_configs) {
+            return Ok(vec![EMPTY_PARTITION_PATH.to_string()]);
+        }
+
+        let top_level_dirs: Vec<String> = self
+            .storage
+            .list_dirs(None)
+            .await?
+            .into_iter()
+            .filter(|dir| !LAKE_FORMAT_METADATA_DIRS.contains(&dir.as_str()))
+            .collect();
+
+        let mut partition_paths = Vec::new();
+        for dir in top_level_dirs {
+            partition_paths.extend(get_leaf_dirs(&self.storage, 
Some(&dir)).await?);
+        }
+
+        if partition_paths.is_empty() || self.partition_pruner.is_empty() {
+            return Ok(partition_paths);
+        }
+
+        Ok(partition_paths
+            .into_iter()
+            .filter(|path_str| self.partition_pruner.should_include(path_str))
+            .collect())
+    }
+
+    pub async fn list_file_groups_for_relevant_partitions(
+        &self,
+    ) -> Result<DashMap<String, Vec<FileGroup>>> {
+        if !is_table_partitioned(&self.hudi_configs) {
+            let file_groups = self
+                .list_file_groups_for_partition(EMPTY_PARTITION_PATH)
+                .await?;
+            let file_groups_map = DashMap::with_capacity(1);
+            file_groups_map.insert(EMPTY_PARTITION_PATH.to_string(), 
file_groups);
+            return Ok(file_groups_map);
+        }
+
+        let pruned_partition_paths = 
self.list_relevant_partition_paths().await?;
+        let file_groups_map = 
Arc::new(DashMap::with_capacity(pruned_partition_paths.len()));
+        let parallelism = self
+            .hudi_configs
+            .get_or_default(ListingParallelism)
+            .to::<usize>();
+        stream::iter(pruned_partition_paths)
+            .map(|p| async move {
+                let file_groups = 
self.list_file_groups_for_partition(&p).await?;
+                Ok::<_, CoreError>((p, file_groups))
+            })
+            .buffer_unordered(parallelism)
+            .try_for_each(|(p, file_groups)| {
+                let file_groups_map = file_groups_map.clone();
+                async move {
+                    file_groups_map.insert(p, file_groups);
+                    Ok(())
+                }
+            })
+            .await?;
+
+        Ok(file_groups_map.as_ref().to_owned())
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use crate::table::Table;
+    use hudi_tests::SampleTable;
+    use std::collections::HashSet;
+
+    #[tokio::test]
+    async fn list_partition_paths_for_nonpartitioned_table() {
+        let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let lister = FileLister::new(
+            hudi_table.hudi_configs.clone(),
+            hudi_table.file_system_view.storage.clone(),
+            PartitionPruner::empty(),
+        );
+        let partition_paths = 
lister.list_relevant_partition_paths().await.unwrap();
+        let partition_path_set: HashSet<&str> =
+            HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
+        assert_eq!(partition_path_set, HashSet::from([""]))
+    }
+
+    #[tokio::test]
+    async fn list_partition_paths_for_complexkeygen_table() {
+        let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let fs_view = &hudi_table.file_system_view;
+        let lister = FileLister::new(
+            fs_view.hudi_configs.clone(),
+            fs_view.storage.clone(),
+            PartitionPruner::empty(),
+        );
+        let partition_paths = 
lister.list_relevant_partition_paths().await.unwrap();
+        let partition_path_set: HashSet<&str> =
+            HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
+        assert_eq!(
+            partition_path_set,
+            HashSet::from_iter(vec![
+                "byteField=10/shortField=300",
+                "byteField=20/shortField=100",
+                "byteField=30/shortField=100"
+            ])
+        )
+    }
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 0fdd46b..c1db54d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -87,6 +87,7 @@
 
 pub mod builder;
 mod fs_view;
+mod listing;
 pub mod partition;
 
 use crate::config::read::HudiReadConfig::AsOfTimestamp;
diff --git a/crates/core/src/table/partition.rs 
b/crates/core/src/table/partition.rs
index be541c2..ace4a49 100644
--- a/crates/core/src/table/partition.rs
+++ b/crates/core/src/table/partition.rs
@@ -25,10 +25,28 @@ use crate::Result;
 use arrow_array::{ArrayRef, Scalar};
 use arrow_schema::Schema;
 
+use crate::config::table::HudiTableConfig::{KeyGeneratorClass, 
PartitionFields};
 use std::collections::HashMap;
 use std::sync::Arc;
 
 pub const PARTITION_METAFIELD_PREFIX: &str = ".hoodie_partition_metadata";
+pub const EMPTY_PARTITION_PATH: &str = "";
+
+pub fn is_table_partitioned(hudi_configs: &HudiConfigs) -> bool {
+    let has_partition_fields = !hudi_configs
+        .get_or_default(PartitionFields)
+        .to::<Vec<String>>()
+        .is_empty();
+
+    let uses_non_partitioned_key_gen = hudi_configs
+        .try_get(KeyGeneratorClass)
+        .map(|key_gen| {
+            key_gen.to::<String>() == 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator"
+        })
+        .unwrap_or(false);
+
+    has_partition_fields && !uses_non_partitioned_key_gen
+}
 
 /// A partition pruner that filters partitions based on the partition path and 
its filters.
 #[derive(Debug, Clone)]
diff --git a/python/src/internal.rs b/python/src/internal.rs
index b2a667b..9d300fd 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -126,7 +126,7 @@ impl HudiFileSlice {
 #[cfg(not(tarpaulin))]
 fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
     let file_id = f.file_id().to_string();
-    let partition_path = f.partition_path().to_string();
+    let partition_path = f.partition_path.to_string();
     let creation_instant_time = f.creation_instant_time().to_string();
     let base_file_name = f.base_file.file_name();
     let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();

Reply via email to