This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 1779e77  refactor: improve `BaseFile` APIs (#239)
1779e77 is described below

commit 1779e772b748a4c86bd6af13c0899c5168958f2f
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jan 8 15:33:54 2025 -0600

    refactor: improve `BaseFile` APIs (#239)
    
    - Consolidate `FileInfo` and `FileStats` into `FileMetadata`
    - Improve variable and struct property naming
    - Make more ergonomical APIs
---
 crates/core/src/file_group/base_file.rs            | 134 +++++++++++++++
 crates/core/src/file_group/mod.rs                  | 185 ++++++---------------
 crates/core/src/file_group/reader.rs               |   4 +-
 .../src/storage/{file_info.rs => file_metadata.rs} |  28 +++-
 crates/core/src/storage/file_stats.rs              |  25 ---
 crates/core/src/storage/mod.rs                     | 127 ++++++--------
 crates/core/src/table/fs_view.rs                   |  19 ++-
 crates/core/src/table/mod.rs                       |  18 +-
 crates/datafusion/src/lib.rs                       |  28 +++-
 python/hudi/_internal.pyi                          |  14 +-
 python/src/internal.rs                             |  24 +--
 python/tests/test_table_read.py                    |   4 +-
 12 files changed, 325 insertions(+), 285 deletions(-)

diff --git a/crates/core/src/file_group/base_file.rs 
b/crates/core/src/file_group/base_file.rs
new file mode 100644
index 0000000..6f26c85
--- /dev/null
+++ b/crates/core/src/file_group/base_file.rs
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::storage::file_metadata::FileMetadata;
+use crate::Result;
+
+/// Hudi Base file, part of a [FileSlice].
+#[derive(Clone, Debug)]
+pub struct BaseFile {
+    /// The file name of the base file.
+    pub file_name: String,
+
+    /// The id of the enclosing file group.
+    pub file_group_id: String,
+
+    /// The associated instant time of the base file.
+    pub instant_time: String,
+
+    /// The metadata about the file.
+    pub file_metadata: Option<FileMetadata>,
+}
+
+impl BaseFile {
+    /// Parse file name and extract `file_group_id` and `instant_time`.
+    fn parse_file_name(file_name: &str) -> Result<(String, String)> {
+        let err_msg = format!("Failed to parse file name '{file_name}' for 
base file.");
+        let (name, _) = file_name
+            .rsplit_once('.')
+            .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
+        let parts: Vec<&str> = name.split('_').collect();
+        let file_group_id = parts
+            .first()
+            .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
+            .to_string();
+        let instant_time = parts
+            .get(2)
+            .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
+            .to_string();
+        Ok((file_group_id, instant_time))
+    }
+}
+
+impl TryFrom<&str> for BaseFile {
+    type Error = CoreError;
+
+    fn try_from(file_name: &str) -> Result<Self> {
+        let (file_group_id, instant_time) = Self::parse_file_name(file_name)?;
+        Ok(Self {
+            file_name: file_name.to_string(),
+            file_group_id,
+            instant_time,
+            file_metadata: None,
+        })
+    }
+}
+
+impl TryFrom<FileMetadata> for BaseFile {
+    type Error = CoreError;
+
+    fn try_from(metadata: FileMetadata) -> Result<Self> {
+        let file_name = metadata.name.clone();
+        let (file_group_id, instant_time) = Self::parse_file_name(&file_name)?;
+        Ok(Self {
+            file_name,
+            file_group_id,
+            instant_time,
+            file_metadata: Some(metadata),
+        })
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use hudi_tests::assert_not;
+
+    #[test]
+    fn test_create_base_file_from_file_name() {
+        let file_name = 
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
+        let base_file = BaseFile::try_from(file_name).unwrap();
+        assert_eq!(
+            base_file.file_group_id,
+            "5a226868-2934-4f84-a16f-55124630c68d-0"
+        );
+        assert_eq!(base_file.instant_time, "20240402144910683");
+        assert!(base_file.file_metadata.is_none());
+    }
+
+    #[test]
+    fn test_create_base_file_from_metadata() {
+        let metadata = FileMetadata::new(
+            
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+            1024,
+        );
+        let base_file = BaseFile::try_from(metadata).unwrap();
+        assert_eq!(
+            base_file.file_group_id,
+            "5a226868-2934-4f84-a16f-55124630c68d-0"
+        );
+        assert_eq!(base_file.instant_time, "20240402144910683");
+        let file_metadata = base_file.file_metadata.unwrap();
+        assert_eq!(file_metadata.size, 1024);
+        assert_not!(file_metadata.fully_populated);
+    }
+
+    #[test]
+    fn create_a_base_file_returns_error() {
+        let result = BaseFile::try_from("no_file_extension");
+        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+        let result = BaseFile::try_from(".parquet");
+        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+        let metadata = FileMetadata::new("no-valid-delimiter.parquet", 1024);
+        let result = BaseFile::try_from(metadata);
+        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+    }
+}
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index f93caef..8c92f4e 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,12 +20,12 @@
 //!
 //! A set of data/base files + set of log files, that make up a unit for all 
operations.
 
+pub mod base_file;
 pub mod builder;
 pub mod reader;
 
 use crate::error::CoreError;
-use crate::storage::file_info::FileInfo;
-use crate::storage::file_stats::FileStats;
+use crate::file_group::base_file::BaseFile;
 use crate::storage::Storage;
 use crate::Result;
 use std::collections::BTreeMap;
@@ -34,70 +34,7 @@ use std::fmt::Formatter;
 use std::hash::{Hash, Hasher};
 use std::path::PathBuf;
 
-/// Represents common metadata about a Hudi Base File.
-#[derive(Clone, Debug)]
-pub struct BaseFile {
-    /// The file group id that is unique across the table.
-    pub file_group_id: String,
-
-    pub commit_time: String,
-
-    pub info: FileInfo,
-
-    pub stats: Option<FileStats>,
-}
-
-impl BaseFile {
-    /// Parse file name and extract file_group_id and commit_time.
-    fn parse_file_name(file_name: &str) -> Result<(String, String)> {
-        let err_msg = format!("Failed to parse file name '{}' for base file.", 
file_name);
-        let (name, _) = file_name
-            .rsplit_once('.')
-            .ok_or(CoreError::FileGroup(err_msg.clone()))?;
-        let parts: Vec<&str> = name.split('_').collect();
-        let file_group_id = parts
-            .first()
-            .ok_or(CoreError::FileGroup(err_msg.clone()))?
-            .to_string();
-        let commit_time = parts
-            .get(2)
-            .ok_or(CoreError::FileGroup(err_msg.clone()))?
-            .to_string();
-        Ok((file_group_id, commit_time))
-    }
-
-    /// Construct [BaseFile] with the base file name.
-    ///
-    /// TODO: refactor such that file info size is optional and no one expects 
it.
-    pub fn from_file_name(file_name: &str) -> Result<Self> {
-        let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
-        let info = FileInfo {
-            name: file_name.to_string(),
-            size: 0,
-            uri: "".to_string(),
-        };
-        Ok(Self {
-            file_group_id,
-            commit_time,
-            info,
-            stats: None,
-        })
-    }
-
-    /// Construct [BaseFile] with the [FileInfo].
-    pub fn from_file_info(info: FileInfo) -> Result<Self> {
-        let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
-        Ok(Self {
-            file_group_id,
-            commit_time,
-            info,
-            stats: None,
-        })
-    }
-}
-
-/// Within a file group, a slice is a combination of data file written at a 
commit time and list of log files,
-/// containing changes to the data file from that commit time.
+/// Within a [FileGroup], a [FileSlice] is a logical group of [BaseFile] and 
log files.
 ///
 /// [note] The log files are not yet supported.
 #[derive(Clone, Debug)]
@@ -107,47 +44,56 @@ pub struct FileSlice {
 }
 
 impl FileSlice {
-    #[cfg(test)]
-    pub fn base_file_path(&self) -> &str {
-        self.base_file.info.uri.as_str()
+    pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
+        Self {
+            base_file,
+            partition_path,
+        }
     }
 
-    pub fn base_file_relative_path(&self) -> String {
-        let ptn = self.partition_path.as_deref().unwrap_or_default();
-        let file_name = &self.base_file.info.name;
-        PathBuf::from(ptn)
-            .join(file_name)
-            .to_str()
-            .unwrap()
-            .to_string()
+    /// Returns the relative path of the base file.
+    pub fn base_file_relative_path(&self) -> Result<String> {
+        let file_name = &self.base_file.file_name;
+        let path = PathBuf::from(self.partition_path()).join(file_name);
+        path.to_str().map(|s| s.to_string()).ok_or_else(|| {
+            CoreError::FileGroup(format!(
+                "Failed to get base file relative path for file slice: {:?}",
+                self
+            ))
+        })
     }
 
+    /// Returns the enclosing [FileGroup]'s id.
+    #[inline]
     pub fn file_group_id(&self) -> &str {
         &self.base_file.file_group_id
     }
 
-    pub fn set_base_file(&mut self, base_file: BaseFile) {
-        self.base_file = base_file
+    /// Returns the partition path of the [FileSlice].
+    #[inline]
+    pub fn partition_path(&self) -> &str {
+        self.partition_path.as_deref().unwrap_or_default()
     }
 
-    /// Load stats from storage layer for the base file if not already loaded.
-    pub async fn load_stats(&mut self, storage: &Storage) -> Result<()> {
-        if self.base_file.stats.is_none() {
-            let parquet_meta = storage
-                .get_parquet_file_metadata(&self.base_file_relative_path())
-                .await?;
-            let num_records = parquet_meta.file_metadata().num_rows();
-            let size_bytes = parquet_meta
-                .row_groups()
-                .iter()
-                .map(|rg| rg.total_byte_size())
-                .sum::<i64>();
-            let stats = FileStats {
-                num_records,
-                size_bytes,
-            };
-            self.base_file.stats = Some(stats);
+    /// Returns the instant time that marks the [FileSlice] creation.
+    ///
+    /// This is also an instant time stored in the [Timeline].
+    #[inline]
+    pub fn creation_instant_time(&self) -> &str {
+        &self.base_file.instant_time
+    }
+
+    /// Load [FileMetadata] from storage layer for the [BaseFile] if 
`file_metadata` is [None]
+    /// or if `file_metadata` is not fully populated.
+    pub async fn load_metadata_if_needed(&mut self, storage: &Storage) -> 
Result<()> {
+        if let Some(metadata) = &self.base_file.file_metadata {
+            if metadata.fully_populated {
+                return Ok(());
+            }
         }
+        let relative_path = self.base_file_relative_path()?;
+        let fetched_metadata = 
storage.get_file_metadata(&relative_path).await?;
+        self.base_file.file_metadata = Some(fetched_metadata);
         Ok(())
     }
 }
@@ -207,25 +153,21 @@ impl FileGroup {
     }
 
     pub fn add_base_file_from_name(&mut self, file_name: &str) -> 
Result<&Self> {
-        let base_file = BaseFile::from_file_name(file_name)?;
+        let base_file = BaseFile::try_from(file_name)?;
         self.add_base_file(base_file)
     }
 
     pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
-        let commit_time = base_file.commit_time.as_str();
-        if self.file_slices.contains_key(commit_time) {
+        let instant_time = base_file.instant_time.as_str();
+        if self.file_slices.contains_key(instant_time) {
             Err(CoreError::FileGroup(format!(
-                "Commit time {0} is already present in File Group {1}",
-                commit_time.to_owned(),
-                self.id,
+                "Instant time {instant_time} is already present in File Group 
{}",
+                self.id
             )))
         } else {
             self.file_slices.insert(
-                commit_time.to_owned(),
-                FileSlice {
-                    partition_path: self.partition_path.clone(),
-                    base_file,
-                },
+                instant_time.to_owned(),
+                FileSlice::new(base_file, self.partition_path.clone()),
             );
             Ok(self)
         }
@@ -254,31 +196,6 @@ impl FileGroup {
 mod tests {
     use super::*;
 
-    #[test]
-    fn create_a_base_file_successfully() {
-        let base_file = BaseFile::from_file_name(
-            
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
-        )
-        .unwrap();
-        assert_eq!(
-            base_file.file_group_id,
-            "5a226868-2934-4f84-a16f-55124630c68d-0"
-        );
-        assert_eq!(base_file.commit_time, "20240402144910683");
-    }
-
-    #[test]
-    fn create_a_base_file_returns_error() {
-        let result = BaseFile::from_file_name("no_file_extension");
-        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
-
-        let result = BaseFile::from_file_name(".parquet");
-        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
-
-        let result = BaseFile::from_file_name("no-valid-delimiter.parquet");
-        assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
-    }
-
     #[test]
     fn load_a_valid_file_group() {
         let mut fg = 
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
@@ -296,7 +213,7 @@ mod tests {
             fg.get_file_slice_as_of("20240402123035233")
                 .unwrap()
                 .base_file
-                .commit_time,
+                .instant_time,
             "20240402123035233"
         );
         assert!(fg.get_file_slice_as_of("-1").is_none());
@@ -313,7 +230,7 @@ mod tests {
             
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet",
         );
         assert!(res2.is_err());
-        assert_eq!(res2.unwrap_err().to_string(), "File group error: Commit 
time 20240402144910683 is already present in File Group 
5a226868-2934-4f84-a16f-55124630c68d-0");
+        assert_eq!(res2.unwrap_err().to_string(), "File group error: Instant 
time 20240402144910683 is already present in File Group 
5a226868-2934-4f84-a16f-55124630c68d-0");
     }
 
     #[test]
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index a7884d0..49200f7 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -118,8 +118,8 @@ impl FileGroupReader {
     }
 
     pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
-        
self.read_file_slice_by_base_file_path(&file_slice.base_file_relative_path())
-            .await
+        let relative_path = file_slice.base_file_relative_path()?;
+        self.read_file_slice_by_base_file_path(&relative_path).await
     }
 }
 
diff --git a/crates/core/src/storage/file_info.rs 
b/crates/core/src/storage/file_metadata.rs
similarity index 62%
rename from crates/core/src/storage/file_info.rs
rename to crates/core/src/storage/file_metadata.rs
index a6f1e05..649a94a 100644
--- a/crates/core/src/storage/file_info.rs
+++ b/crates/core/src/storage/file_metadata.rs
@@ -17,10 +17,32 @@
  * under the License.
  */
 
-/// File info that can be retrieved by listing operations without reading the 
file.
 #[derive(Clone, Debug, Default, Eq, PartialEq)]
-pub struct FileInfo {
-    pub uri: String,
+pub struct FileMetadata {
+    /// File name
     pub name: String,
+
+    /// Size in bytes on storage
     pub size: usize,
+
+    /// Size in bytes in memory
+    pub byte_size: i64,
+
+    /// Number of records in the file
+    pub num_records: i64,
+
+    /// Whether all the properties are populated or not
+    pub fully_populated: bool,
+}
+
+impl FileMetadata {
+    pub fn new(name: impl Into<String>, size: usize) -> Self {
+        Self {
+            name: name.into(),
+            size,
+            byte_size: 0,
+            num_records: 0,
+            fully_populated: false,
+        }
+    }
 }
diff --git a/crates/core/src/storage/file_stats.rs 
b/crates/core/src/storage/file_stats.rs
deleted file mode 100644
index 65fe1c5..0000000
--- a/crates/core/src/storage/file_stats.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/// File stats that can be retrieved by reading the file's metadata.
-#[derive(Clone, Debug, Default)]
-pub struct FileStats {
-    pub num_records: i64,
-    pub size_bytes: i64,
-}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 48ab293..4f5aec3 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -38,12 +38,11 @@ use crate::config::table::HudiTableConfig;
 use crate::config::HudiConfigs;
 use crate::storage::error::Result;
 use crate::storage::error::StorageError::{Creation, InvalidPath};
-use crate::storage::file_info::FileInfo;
+use crate::storage::file_metadata::FileMetadata;
 use crate::storage::util::join_url_segments;
 
 pub mod error;
-pub mod file_info;
-pub mod file_stats;
+pub mod file_metadata;
 pub mod util;
 
 #[allow(dead_code)]
@@ -105,22 +104,41 @@ impl Storage {
     }
 
     #[cfg(test)]
-    async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
+    async fn get_file_metadata_not_populated(&self, relative_path: &str) -> 
Result<FileMetadata> {
         let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
         let obj_path = ObjPath::from_url_path(obj_url.path())?;
         let meta = self.object_store.head(&obj_path).await?;
-        let uri = obj_url.to_string();
-        let name = obj_path
+        let name = meta.location.filename().ok_or_else(|| {
+            InvalidPath(format!("Failed to get file name from: {:?}", 
meta.location))
+        })?;
+        Ok(FileMetadata::new(name.to_string(), meta.size))
+    }
+
+    pub async fn get_file_metadata(&self, relative_path: &str) -> 
Result<FileMetadata> {
+        let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+        let obj_path = ObjPath::from_url_path(obj_url.path())?;
+        let obj_store = self.object_store.clone();
+        let obj_meta = obj_store.head(&obj_path).await?;
+        let location = obj_meta.location.clone();
+        let file_name = location
             .filename()
-            .ok_or(InvalidPath(format!(
-                "Failed to get file name from {:?}",
-                obj_path
-            )))?
-            .to_string();
-        Ok(FileInfo {
-            uri,
-            name,
-            size: meta.size,
+            .ok_or_else(|| InvalidPath(format!("Failed to get file name from: 
{:?}", &obj_meta)))?;
+        let size = obj_meta.size;
+        let reader = ParquetObjectReader::new(obj_store, obj_meta);
+        let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+        let parquet_meta = builder.metadata().clone();
+        let num_records = parquet_meta.file_metadata().num_rows();
+        let size_bytes = parquet_meta
+            .row_groups()
+            .iter()
+            .map(|rg| rg.total_byte_size())
+            .sum::<i64>();
+        Ok(FileMetadata {
+            name: file_name.to_string(),
+            size,
+            byte_size: size_bytes,
+            num_records,
+            fully_populated: true,
         })
     }
 
@@ -196,33 +214,22 @@ impl Storage {
         Ok(list_res.common_prefixes)
     }
 
-    pub async fn list_files(&self, subdir: Option<&str>) -> 
Result<Vec<FileInfo>> {
+    pub async fn list_files(&self, subdir: Option<&str>) -> 
Result<Vec<FileMetadata>> {
         let prefix_url = join_url_segments(&self.base_url, 
&[subdir.unwrap_or_default()])?;
         let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
         let list_res = self
             .object_store
             .list_with_delimiter(Some(&prefix_path))
             .await?;
-        let mut file_info = Vec::new();
+        let mut file_metadata = Vec::new();
         for obj_meta in list_res.objects {
-            let name = obj_meta
-                .location
+            let location = obj_meta.location;
+            let name = location
                 .filename()
-                .ok_or_else(|| {
-                    InvalidPath(format!(
-                        "Failed to get file name from {:?}",
-                        obj_meta.location
-                    ))
-                })?
-                .to_string();
-            let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
-            file_info.push(FileInfo {
-                uri,
-                name,
-                size: obj_meta.size,
-            });
+                .ok_or_else(|| InvalidPath(format!("Failed to get file name 
from {location:?}")))?;
+            file_metadata.push(FileMetadata::new(name.to_string(), 
obj_meta.size));
         }
-        Ok(file_info)
+        Ok(file_metadata)
     }
 }
 
@@ -349,54 +356,27 @@ mod tests {
         )
         .unwrap();
         let storage = Storage::new_with_base_url(base_url).unwrap();
-        let file_info_1: Vec<FileInfo> = storage
+        let file_info_1: Vec<FileMetadata> = storage
             .list_files(None)
             .await
             .unwrap()
             .into_iter()
             .collect();
-        assert_eq!(
-            file_info_1,
-            vec![FileInfo {
-                uri: join_url_segments(&storage.base_url, &["a.parquet"])
-                    .unwrap()
-                    .to_string(),
-                name: "a.parquet".to_string(),
-                size: 0,
-            }]
-        );
-        let file_info_2: Vec<FileInfo> = storage
+        assert_eq!(file_info_1, vec![FileMetadata::new("a.parquet", 0)]);
+        let file_info_2: Vec<FileMetadata> = storage
             .list_files(Some("part1"))
             .await
             .unwrap()
             .into_iter()
             .collect();
-        assert_eq!(
-            file_info_2,
-            vec![FileInfo {
-                uri: join_url_segments(&storage.base_url, &["part1/b.parquet"])
-                    .unwrap()
-                    .to_string(),
-                name: "b.parquet".to_string(),
-                size: 0,
-            }]
-        );
-        let file_info_3: Vec<FileInfo> = storage
+        assert_eq!(file_info_2, vec![FileMetadata::new("b.parquet", 0)],);
+        let file_info_3: Vec<FileMetadata> = storage
             .list_files(Some("part2/part22"))
             .await
             .unwrap()
             .into_iter()
             .collect();
-        assert_eq!(
-            file_info_3,
-            vec![FileInfo {
-                uri: join_url_segments(&storage.base_url, 
&["part2/part22/c.parquet"])
-                    .unwrap()
-                    .to_string(),
-                name: "c.parquet".to_string(),
-                size: 0,
-            }]
-        );
+        assert_eq!(file_info_3, vec![FileMetadata::new("c.parquet", 0)],);
     }
 
     #[tokio::test]
@@ -432,15 +412,12 @@ mod tests {
         let base_url =
             
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
         let storage = Storage::new_with_base_url(base_url).unwrap();
-        let file_info = storage.get_file_info("a.parquet").await.unwrap();
-        assert_eq!(file_info.name, "a.parquet");
-        assert_eq!(
-            file_info.uri,
-            join_url_segments(&storage.base_url, &["a.parquet"])
-                .unwrap()
-                .to_string()
-        );
-        assert_eq!(file_info.size, 866);
+        let file_metadata = storage
+            .get_file_metadata_not_populated("a.parquet")
+            .await
+            .unwrap();
+        assert_eq!(file_metadata.name, "a.parquet");
+        assert_eq!(file_metadata.size, 866);
     }
 
     #[tokio::test]
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index eef3150..4e6aa1d 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -21,12 +21,13 @@ use std::collections::{HashMap, HashSet};
 use std::sync::Arc;
 
 use crate::config::HudiConfigs;
-use crate::file_group::{BaseFile, FileGroup, FileSlice};
-use crate::storage::file_info::FileInfo;
+use crate::file_group::base_file::BaseFile;
+use crate::file_group::{FileGroup, FileSlice};
 use crate::storage::{get_leaf_dirs, Storage};
 
 use crate::config::read::HudiReadConfig::ListingParallelism;
 use crate::error::CoreError;
+use crate::storage::file_metadata::FileMetadata;
 use crate::table::partition::PartitionPruner;
 use crate::Result;
 use dashmap::DashMap;
@@ -91,7 +92,7 @@ impl FileSystemView {
         storage: &Storage,
         partition_path: &str,
     ) -> Result<Vec<FileGroup>> {
-        let file_info: Vec<FileInfo> = storage
+        let file_metadata: Vec<FileMetadata> = storage
             .list_files(Some(partition_path))
             .await?
             .into_iter()
@@ -99,8 +100,8 @@ impl FileSystemView {
             .collect();
 
         let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
-        for f in file_info {
-            let base_file = BaseFile::from_file_info(f)?;
+        for metadata in file_metadata {
+            let base_file = BaseFile::try_from(metadata)?;
             let fg_id = &base_file.file_group_id;
             fg_id_to_base_files
                 .entry(fg_id.to_owned())
@@ -163,7 +164,7 @@ impl FileSystemView {
                     continue;
                 }
                 if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
-                    fsl.load_stats(&self.storage).await?;
+                    fsl.load_metadata_if_needed(&self.storage).await?;
                     file_slices.push(fsl.clone());
                 }
             }
@@ -260,7 +261,7 @@ mod tests {
             .collect::<Vec<_>>();
         assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
         for fsl in file_slices.iter() {
-            assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4);
+            
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 4);
         }
     }
 
@@ -289,7 +290,7 @@ mod tests {
             .collect::<Vec<_>>();
         assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
         for fsl in file_slices.iter() {
-            assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1);
+            
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 1);
         }
     }
 
@@ -330,7 +331,7 @@ mod tests {
             .collect::<Vec<_>>();
         assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
         for fsl in file_slices.iter() {
-            assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 2);
+            
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2);
         }
     }
 }
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 6d05c71..2c50fc7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -61,6 +61,7 @@
 //! use url::Url;
 //! use hudi_core::table::Table;
 //! use hudi_core::storage::util::parse_uri;
+//! use hudi_core::storage::util::join_url_segments;
 //!
 //! pub async fn test() {
 //!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
@@ -74,8 +75,8 @@
 //!             let file_group_vec = file_slice_vec
 //!                 .iter()
 //!                 .map(|f| {
-//!                     let url = parse_uri(&f.base_file.info.uri).unwrap();
-//!                     let size = f.base_file.info.size as u64;
+//!                     let relative_path = 
f.base_file_relative_path().unwrap();
+//!                     let url = join_url_segments(&base_uri, 
&[relative_path.as_str()]).unwrap();
 //!                     url.path().to_string()
 //!                 })
 //!                 .collect();
@@ -372,8 +373,11 @@ mod tests {
     /// Test helper to get relative file paths from the table with filters.
     async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) -> 
Result<Vec<String>> {
         let mut file_paths = Vec::new();
+        let base_url = table.base_url()?;
         for f in table.get_file_slices(filters).await? {
-            file_paths.push(f.base_file_path().to_string());
+            let relative_path = f.base_file_relative_path()?;
+            let file_url = join_url_segments(&base_url, 
&[relative_path.as_str()])?;
+            file_paths.push(file_url.to_string());
         }
         Ok(file_paths)
     }
@@ -710,7 +714,7 @@ mod tests {
         assert_eq!(
             file_slices
                 .iter()
-                .map(|f| f.base_file_relative_path())
+                .map(|f| f.base_file_relative_path().unwrap())
                 .collect::<Vec<_>>(),
             
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
         );
@@ -724,7 +728,7 @@ mod tests {
         assert_eq!(
             file_slices
                 .iter()
-                .map(|f| f.base_file_relative_path())
+                .map(|f| f.base_file_relative_path().unwrap())
                 .collect::<Vec<_>>(),
             
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
         );
@@ -738,7 +742,7 @@ mod tests {
         assert_eq!(
             file_slices
                 .iter()
-                .map(|f| f.base_file_relative_path())
+                .map(|f| f.base_file_relative_path().unwrap())
                 .collect::<Vec<_>>(),
             
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
         );
@@ -752,7 +756,7 @@ mod tests {
         assert_eq!(
             file_slices
                 .iter()
-                .map(|f| f.base_file_relative_path())
+                .map(|f| f.base_file_relative_path().unwrap())
                 .collect::<Vec<_>>(),
             Vec::<String>::new()
         );
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 718dc61..0c27e74 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -45,7 +45,7 @@ use datafusion_physical_expr::create_physical_expr;
 use crate::util::expr::exprs_to_filters;
 use hudi_core::config::read::HudiReadConfig::InputPartitions;
 use hudi_core::config::util::empty_options;
-use hudi_core::storage::util::{get_scheme_authority, parse_uri};
+use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
 use hudi_core::table::Table as HudiTable;
 
 /// Create a `HudiDataSource`.
@@ -181,16 +181,26 @@ impl TableProvider for HudiDataSource {
             .get_file_slices_splits(self.get_input_partitions(), 
pushdown_filters.as_slice())
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {}", e)))?;
+        let base_url = self.table.base_url().map_err(|e| {
+            Execution(format!(
+                "Failed to get base path config from Hudi table: {e:?}"
+            ))
+        })?;
         let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
         for file_slice_vec in file_slices {
-            let parquet_file_group_vec = file_slice_vec
-                .iter()
-                .map(|f| {
-                    let url = parse_uri(&f.base_file.info.uri).unwrap();
-                    let size = f.base_file.info.size as u64;
-                    PartitionedFile::new(url.path(), size)
-                })
-                .collect();
+            let mut parquet_file_group_vec = Vec::new();
+            for f in file_slice_vec {
+                let relative_path = f.base_file_relative_path().map_err(|e| {
+                    Execution(format!(
+                        "Failed to get base file relative path for {f:?} due 
to {e:?}"
+                    ))
+                })?;
+                let url = join_url_segments(&base_url, 
&[relative_path.as_str()])
+                    .map_err(|e| Execution(format!("Failed to join URL 
segments: {e:?}")))?;
+                let size = f.base_file.file_metadata.as_ref().map_or(0, |m| 
m.size);
+                let partitioned_file = PartitionedFile::new(url.path(), size 
as u64);
+                parquet_file_group_vec.push(partitioned_file);
+            }
             parquet_file_groups.push(parquet_file_group_vec)
         }
 
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 4235d17..d5e046f 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -60,22 +60,22 @@ class HudiFileSlice:
     the partition it belongs to, and associated metadata.
 
     Attributes:
-        file_group_id (str): The ID of the file group this slice belongs to.
+        file_group_id (str): The id of the file group this file slice belongs 
to.
         partition_path (str): The path of the partition containing this file 
slice.
-        commit_time (str): The commit time of this file slice.
+        creation_instant_time (str): The creation instant time of this file 
slice.
         base_file_name (str): The name of the base file.
-        base_file_size (int): The size of the base file.
-        num_records (int): The number of records in the base file.
-        size_bytes (int): The size of the file slice in bytes.
+        base_file_size (int): The on-disk size of the base file in bytes.
+        base_file_byte_size (int): The in-memory size of the base file in 
bytes.
+        num_records (int): The number of records in the file slice.
     """
 
     file_group_id: str
     partition_path: str
-    commit_time: str
+    creation_instant_time: str
     base_file_name: str
     base_file_size: int
+    base_file_byte_size: int
     num_records: int
-    size_bytes: int
 
     def base_file_relative_path(self) -> str:
         """
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 1aef081..4c9b7fd 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -94,15 +94,15 @@ pub struct HudiFileSlice {
     #[pyo3(get)]
     partition_path: String,
     #[pyo3(get)]
-    commit_time: String,
+    creation_instant_time: String,
     #[pyo3(get)]
     base_file_name: String,
     #[pyo3(get)]
     base_file_size: usize,
     #[pyo3(get)]
-    num_records: i64,
+    base_file_byte_size: i64,
     #[pyo3(get)]
-    size_bytes: i64,
+    num_records: i64,
 }
 
 #[cfg(not(tarpaulin))]
@@ -126,21 +126,21 @@ impl HudiFileSlice {
 #[cfg(not(tarpaulin))]
 fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
     let file_group_id = f.file_group_id().to_string();
-    let partition_path = 
f.partition_path.as_deref().unwrap_or_default().to_string();
-    let commit_time = f.base_file.commit_time.to_string();
-    let base_file_name = f.base_file.info.name.clone();
-    let base_file_size = f.base_file.info.size;
-    let stats = f.base_file.stats.clone().unwrap_or_default();
-    let num_records = stats.num_records;
-    let size_bytes = stats.size_bytes;
+    let partition_path = f.partition_path().to_string();
+    let creation_instant_time = f.creation_instant_time().to_string();
+    let base_file_name = f.base_file.file_name.clone();
+    let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();
+    let base_file_size = file_metadata.size;
+    let base_file_byte_size = file_metadata.byte_size;
+    let num_records = file_metadata.num_records;
     HudiFileSlice {
         file_group_id,
         partition_path,
-        commit_time,
+        creation_instant_time,
         base_file_name,
         base_file_size,
+        base_file_byte_size,
         num_records,
-        size_bytes,
     }
 }
 
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index f8986a8..c35be59 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -51,12 +51,12 @@ def 
test_read_table_returns_correct_file_slices(get_sample_table):
 
     file_slices = table.get_file_slices()
     assert len(file_slices) == 5
-    assert set(f.commit_time for f in file_slices) == {
+    assert set(f.creation_instant_time for f in file_slices) == {
         "20240402123035233",
         "20240402144910683",
     }
     assert all(f.num_records == 1 for f in file_slices)
-    assert all(f.size_bytes > 0 for f in file_slices)
+    assert all(f.base_file_byte_size > 0 for f in file_slices)
     file_slice_paths = [f.base_file_relative_path() for f in file_slices]
     assert set(file_slice_paths) == {
         
"chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet",


Reply via email to