This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 1779e77 refactor: improve `BaseFile` APIs (#239)
1779e77 is described below
commit 1779e772b748a4c86bd6af13c0899c5168958f2f
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jan 8 15:33:54 2025 -0600
refactor: improve `BaseFile` APIs (#239)
- Consolidate `FileInfo` and `FileStats` into `FileMetadata`
- Improve variable and struct property naming
- Make more ergonomical APIs
---
crates/core/src/file_group/base_file.rs | 134 +++++++++++++++
crates/core/src/file_group/mod.rs | 185 ++++++---------------
crates/core/src/file_group/reader.rs | 4 +-
.../src/storage/{file_info.rs => file_metadata.rs} | 28 +++-
crates/core/src/storage/file_stats.rs | 25 ---
crates/core/src/storage/mod.rs | 127 ++++++--------
crates/core/src/table/fs_view.rs | 19 ++-
crates/core/src/table/mod.rs | 18 +-
crates/datafusion/src/lib.rs | 28 +++-
python/hudi/_internal.pyi | 14 +-
python/src/internal.rs | 24 +--
python/tests/test_table_read.py | 4 +-
12 files changed, 325 insertions(+), 285 deletions(-)
diff --git a/crates/core/src/file_group/base_file.rs
b/crates/core/src/file_group/base_file.rs
new file mode 100644
index 0000000..6f26c85
--- /dev/null
+++ b/crates/core/src/file_group/base_file.rs
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::error::CoreError;
+use crate::storage::file_metadata::FileMetadata;
+use crate::Result;
+
+/// Hudi Base file, part of a [FileSlice].
+#[derive(Clone, Debug)]
+pub struct BaseFile {
+ /// The file name of the base file.
+ pub file_name: String,
+
+ /// The id of the enclosing file group.
+ pub file_group_id: String,
+
+ /// The associated instant time of the base file.
+ pub instant_time: String,
+
+ /// The metadata about the file.
+ pub file_metadata: Option<FileMetadata>,
+}
+
+impl BaseFile {
+ /// Parse file name and extract `file_group_id` and `instant_time`.
+ fn parse_file_name(file_name: &str) -> Result<(String, String)> {
+ let err_msg = format!("Failed to parse file name '{file_name}' for
base file.");
+ let (name, _) = file_name
+ .rsplit_once('.')
+ .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?;
+ let parts: Vec<&str> = name.split('_').collect();
+ let file_group_id = parts
+ .first()
+ .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
+ .to_string();
+ let instant_time = parts
+ .get(2)
+ .ok_or_else(|| CoreError::FileGroup(err_msg.clone()))?
+ .to_string();
+ Ok((file_group_id, instant_time))
+ }
+}
+
+impl TryFrom<&str> for BaseFile {
+ type Error = CoreError;
+
+ fn try_from(file_name: &str) -> Result<Self> {
+ let (file_group_id, instant_time) = Self::parse_file_name(file_name)?;
+ Ok(Self {
+ file_name: file_name.to_string(),
+ file_group_id,
+ instant_time,
+ file_metadata: None,
+ })
+ }
+}
+
+impl TryFrom<FileMetadata> for BaseFile {
+ type Error = CoreError;
+
+ fn try_from(metadata: FileMetadata) -> Result<Self> {
+ let file_name = metadata.name.clone();
+ let (file_group_id, instant_time) = Self::parse_file_name(&file_name)?;
+ Ok(Self {
+ file_name,
+ file_group_id,
+ instant_time,
+ file_metadata: Some(metadata),
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use hudi_tests::assert_not;
+
+ #[test]
+ fn test_create_base_file_from_file_name() {
+ let file_name =
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet";
+ let base_file = BaseFile::try_from(file_name).unwrap();
+ assert_eq!(
+ base_file.file_group_id,
+ "5a226868-2934-4f84-a16f-55124630c68d-0"
+ );
+ assert_eq!(base_file.instant_time, "20240402144910683");
+ assert!(base_file.file_metadata.is_none());
+ }
+
+ #[test]
+ fn test_create_base_file_from_metadata() {
+ let metadata = FileMetadata::new(
+
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+ 1024,
+ );
+ let base_file = BaseFile::try_from(metadata).unwrap();
+ assert_eq!(
+ base_file.file_group_id,
+ "5a226868-2934-4f84-a16f-55124630c68d-0"
+ );
+ assert_eq!(base_file.instant_time, "20240402144910683");
+ let file_metadata = base_file.file_metadata.unwrap();
+ assert_eq!(file_metadata.size, 1024);
+ assert_not!(file_metadata.fully_populated);
+ }
+
+ #[test]
+ fn create_a_base_file_returns_error() {
+ let result = BaseFile::try_from("no_file_extension");
+ assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+ let result = BaseFile::try_from(".parquet");
+ assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+
+ let metadata = FileMetadata::new("no-valid-delimiter.parquet", 1024);
+ let result = BaseFile::try_from(metadata);
+ assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
+ }
+}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index f93caef..8c92f4e 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,12 +20,12 @@
//!
//! A set of data/base files + set of log files, that make up a unit for all
operations.
+pub mod base_file;
pub mod builder;
pub mod reader;
use crate::error::CoreError;
-use crate::storage::file_info::FileInfo;
-use crate::storage::file_stats::FileStats;
+use crate::file_group::base_file::BaseFile;
use crate::storage::Storage;
use crate::Result;
use std::collections::BTreeMap;
@@ -34,70 +34,7 @@ use std::fmt::Formatter;
use std::hash::{Hash, Hasher};
use std::path::PathBuf;
-/// Represents common metadata about a Hudi Base File.
-#[derive(Clone, Debug)]
-pub struct BaseFile {
- /// The file group id that is unique across the table.
- pub file_group_id: String,
-
- pub commit_time: String,
-
- pub info: FileInfo,
-
- pub stats: Option<FileStats>,
-}
-
-impl BaseFile {
- /// Parse file name and extract file_group_id and commit_time.
- fn parse_file_name(file_name: &str) -> Result<(String, String)> {
- let err_msg = format!("Failed to parse file name '{}' for base file.",
file_name);
- let (name, _) = file_name
- .rsplit_once('.')
- .ok_or(CoreError::FileGroup(err_msg.clone()))?;
- let parts: Vec<&str> = name.split('_').collect();
- let file_group_id = parts
- .first()
- .ok_or(CoreError::FileGroup(err_msg.clone()))?
- .to_string();
- let commit_time = parts
- .get(2)
- .ok_or(CoreError::FileGroup(err_msg.clone()))?
- .to_string();
- Ok((file_group_id, commit_time))
- }
-
- /// Construct [BaseFile] with the base file name.
- ///
- /// TODO: refactor such that file info size is optional and no one expects
it.
- pub fn from_file_name(file_name: &str) -> Result<Self> {
- let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
- let info = FileInfo {
- name: file_name.to_string(),
- size: 0,
- uri: "".to_string(),
- };
- Ok(Self {
- file_group_id,
- commit_time,
- info,
- stats: None,
- })
- }
-
- /// Construct [BaseFile] with the [FileInfo].
- pub fn from_file_info(info: FileInfo) -> Result<Self> {
- let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
- Ok(Self {
- file_group_id,
- commit_time,
- info,
- stats: None,
- })
- }
-}
-
-/// Within a file group, a slice is a combination of data file written at a
commit time and list of log files,
-/// containing changes to the data file from that commit time.
+/// Within a [FileGroup], a [FileSlice] is a logical group of [BaseFile] and
log files.
///
/// [note] The log files are not yet supported.
#[derive(Clone, Debug)]
@@ -107,47 +44,56 @@ pub struct FileSlice {
}
impl FileSlice {
- #[cfg(test)]
- pub fn base_file_path(&self) -> &str {
- self.base_file.info.uri.as_str()
+ pub fn new(base_file: BaseFile, partition_path: Option<String>) -> Self {
+ Self {
+ base_file,
+ partition_path,
+ }
}
- pub fn base_file_relative_path(&self) -> String {
- let ptn = self.partition_path.as_deref().unwrap_or_default();
- let file_name = &self.base_file.info.name;
- PathBuf::from(ptn)
- .join(file_name)
- .to_str()
- .unwrap()
- .to_string()
+ /// Returns the relative path of the base file.
+ pub fn base_file_relative_path(&self) -> Result<String> {
+ let file_name = &self.base_file.file_name;
+ let path = PathBuf::from(self.partition_path()).join(file_name);
+ path.to_str().map(|s| s.to_string()).ok_or_else(|| {
+ CoreError::FileGroup(format!(
+ "Failed to get base file relative path for file slice: {:?}",
+ self
+ ))
+ })
}
+ /// Returns the enclosing [FileGroup]'s id.
+ #[inline]
pub fn file_group_id(&self) -> &str {
&self.base_file.file_group_id
}
- pub fn set_base_file(&mut self, base_file: BaseFile) {
- self.base_file = base_file
+ /// Returns the partition path of the [FileSlice].
+ #[inline]
+ pub fn partition_path(&self) -> &str {
+ self.partition_path.as_deref().unwrap_or_default()
}
- /// Load stats from storage layer for the base file if not already loaded.
- pub async fn load_stats(&mut self, storage: &Storage) -> Result<()> {
- if self.base_file.stats.is_none() {
- let parquet_meta = storage
- .get_parquet_file_metadata(&self.base_file_relative_path())
- .await?;
- let num_records = parquet_meta.file_metadata().num_rows();
- let size_bytes = parquet_meta
- .row_groups()
- .iter()
- .map(|rg| rg.total_byte_size())
- .sum::<i64>();
- let stats = FileStats {
- num_records,
- size_bytes,
- };
- self.base_file.stats = Some(stats);
+ /// Returns the instant time that marks the [FileSlice] creation.
+ ///
+ /// This is also an instant time stored in the [Timeline].
+ #[inline]
+ pub fn creation_instant_time(&self) -> &str {
+ &self.base_file.instant_time
+ }
+
+ /// Load [FileMetadata] from storage layer for the [BaseFile] if
`file_metadata` is [None]
+ /// or if `file_metadata` is not fully populated.
+ pub async fn load_metadata_if_needed(&mut self, storage: &Storage) ->
Result<()> {
+ if let Some(metadata) = &self.base_file.file_metadata {
+ if metadata.fully_populated {
+ return Ok(());
+ }
}
+ let relative_path = self.base_file_relative_path()?;
+ let fetched_metadata =
storage.get_file_metadata(&relative_path).await?;
+ self.base_file.file_metadata = Some(fetched_metadata);
Ok(())
}
}
@@ -207,25 +153,21 @@ impl FileGroup {
}
pub fn add_base_file_from_name(&mut self, file_name: &str) ->
Result<&Self> {
- let base_file = BaseFile::from_file_name(file_name)?;
+ let base_file = BaseFile::try_from(file_name)?;
self.add_base_file(base_file)
}
pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self> {
- let commit_time = base_file.commit_time.as_str();
- if self.file_slices.contains_key(commit_time) {
+ let instant_time = base_file.instant_time.as_str();
+ if self.file_slices.contains_key(instant_time) {
Err(CoreError::FileGroup(format!(
- "Commit time {0} is already present in File Group {1}",
- commit_time.to_owned(),
- self.id,
+ "Instant time {instant_time} is already present in File Group
{}",
+ self.id
)))
} else {
self.file_slices.insert(
- commit_time.to_owned(),
- FileSlice {
- partition_path: self.partition_path.clone(),
- base_file,
- },
+ instant_time.to_owned(),
+ FileSlice::new(base_file, self.partition_path.clone()),
);
Ok(self)
}
@@ -254,31 +196,6 @@ impl FileGroup {
mod tests {
use super::*;
- #[test]
- fn create_a_base_file_successfully() {
- let base_file = BaseFile::from_file_name(
-
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
- )
- .unwrap();
- assert_eq!(
- base_file.file_group_id,
- "5a226868-2934-4f84-a16f-55124630c68d-0"
- );
- assert_eq!(base_file.commit_time, "20240402144910683");
- }
-
- #[test]
- fn create_a_base_file_returns_error() {
- let result = BaseFile::from_file_name("no_file_extension");
- assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
-
- let result = BaseFile::from_file_name(".parquet");
- assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
-
- let result = BaseFile::from_file_name("no-valid-delimiter.parquet");
- assert!(matches!(result.unwrap_err(), CoreError::FileGroup(_)));
- }
-
#[test]
fn load_a_valid_file_group() {
let mut fg =
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
@@ -296,7 +213,7 @@ mod tests {
fg.get_file_slice_as_of("20240402123035233")
.unwrap()
.base_file
- .commit_time,
+ .instant_time,
"20240402123035233"
);
assert!(fg.get_file_slice_as_of("-1").is_none());
@@ -313,7 +230,7 @@ mod tests {
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402144910683.parquet",
);
assert!(res2.is_err());
- assert_eq!(res2.unwrap_err().to_string(), "File group error: Commit
time 20240402144910683 is already present in File Group
5a226868-2934-4f84-a16f-55124630c68d-0");
+ assert_eq!(res2.unwrap_err().to_string(), "File group error: Instant
time 20240402144910683 is already present in File Group
5a226868-2934-4f84-a16f-55124630c68d-0");
}
#[test]
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index a7884d0..49200f7 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -118,8 +118,8 @@ impl FileGroupReader {
}
pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
-
self.read_file_slice_by_base_file_path(&file_slice.base_file_relative_path())
- .await
+ let relative_path = file_slice.base_file_relative_path()?;
+ self.read_file_slice_by_base_file_path(&relative_path).await
}
}
diff --git a/crates/core/src/storage/file_info.rs
b/crates/core/src/storage/file_metadata.rs
similarity index 62%
rename from crates/core/src/storage/file_info.rs
rename to crates/core/src/storage/file_metadata.rs
index a6f1e05..649a94a 100644
--- a/crates/core/src/storage/file_info.rs
+++ b/crates/core/src/storage/file_metadata.rs
@@ -17,10 +17,32 @@
* under the License.
*/
-/// File info that can be retrieved by listing operations without reading the
file.
#[derive(Clone, Debug, Default, Eq, PartialEq)]
-pub struct FileInfo {
- pub uri: String,
+pub struct FileMetadata {
+ /// File name
pub name: String,
+
+ /// Size in bytes on storage
pub size: usize,
+
+ /// Size in bytes in memory
+ pub byte_size: i64,
+
+ /// Number of records in the file
+ pub num_records: i64,
+
+ /// Whether all the properties are populated or not
+ pub fully_populated: bool,
+}
+
+impl FileMetadata {
+ pub fn new(name: impl Into<String>, size: usize) -> Self {
+ Self {
+ name: name.into(),
+ size,
+ byte_size: 0,
+ num_records: 0,
+ fully_populated: false,
+ }
+ }
}
diff --git a/crates/core/src/storage/file_stats.rs
b/crates/core/src/storage/file_stats.rs
deleted file mode 100644
index 65fe1c5..0000000
--- a/crates/core/src/storage/file_stats.rs
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/// File stats that can be retrieved by reading the file's metadata.
-#[derive(Clone, Debug, Default)]
-pub struct FileStats {
- pub num_records: i64,
- pub size_bytes: i64,
-}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 48ab293..4f5aec3 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -38,12 +38,11 @@ use crate::config::table::HudiTableConfig;
use crate::config::HudiConfigs;
use crate::storage::error::Result;
use crate::storage::error::StorageError::{Creation, InvalidPath};
-use crate::storage::file_info::FileInfo;
+use crate::storage::file_metadata::FileMetadata;
use crate::storage::util::join_url_segments;
pub mod error;
-pub mod file_info;
-pub mod file_stats;
+pub mod file_metadata;
pub mod util;
#[allow(dead_code)]
@@ -105,22 +104,41 @@ impl Storage {
}
#[cfg(test)]
- async fn get_file_info(&self, relative_path: &str) -> Result<FileInfo> {
+ async fn get_file_metadata_not_populated(&self, relative_path: &str) ->
Result<FileMetadata> {
let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
let obj_path = ObjPath::from_url_path(obj_url.path())?;
let meta = self.object_store.head(&obj_path).await?;
- let uri = obj_url.to_string();
- let name = obj_path
+ let name = meta.location.filename().ok_or_else(|| {
+ InvalidPath(format!("Failed to get file name from: {:?}",
meta.location))
+ })?;
+ Ok(FileMetadata::new(name.to_string(), meta.size))
+ }
+
+ pub async fn get_file_metadata(&self, relative_path: &str) ->
Result<FileMetadata> {
+ let obj_url = join_url_segments(&self.base_url, &[relative_path])?;
+ let obj_path = ObjPath::from_url_path(obj_url.path())?;
+ let obj_store = self.object_store.clone();
+ let obj_meta = obj_store.head(&obj_path).await?;
+ let location = obj_meta.location.clone();
+ let file_name = location
.filename()
- .ok_or(InvalidPath(format!(
- "Failed to get file name from {:?}",
- obj_path
- )))?
- .to_string();
- Ok(FileInfo {
- uri,
- name,
- size: meta.size,
+ .ok_or_else(|| InvalidPath(format!("Failed to get file name from:
{:?}", &obj_meta)))?;
+ let size = obj_meta.size;
+ let reader = ParquetObjectReader::new(obj_store, obj_meta);
+ let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
+ let parquet_meta = builder.metadata().clone();
+ let num_records = parquet_meta.file_metadata().num_rows();
+ let size_bytes = parquet_meta
+ .row_groups()
+ .iter()
+ .map(|rg| rg.total_byte_size())
+ .sum::<i64>();
+ Ok(FileMetadata {
+ name: file_name.to_string(),
+ size,
+ byte_size: size_bytes,
+ num_records,
+ fully_populated: true,
})
}
@@ -196,33 +214,22 @@ impl Storage {
Ok(list_res.common_prefixes)
}
- pub async fn list_files(&self, subdir: Option<&str>) ->
Result<Vec<FileInfo>> {
+ pub async fn list_files(&self, subdir: Option<&str>) ->
Result<Vec<FileMetadata>> {
let prefix_url = join_url_segments(&self.base_url,
&[subdir.unwrap_or_default()])?;
let prefix_path = ObjPath::from_url_path(prefix_url.path())?;
let list_res = self
.object_store
.list_with_delimiter(Some(&prefix_path))
.await?;
- let mut file_info = Vec::new();
+ let mut file_metadata = Vec::new();
for obj_meta in list_res.objects {
- let name = obj_meta
- .location
+ let location = obj_meta.location;
+ let name = location
.filename()
- .ok_or_else(|| {
- InvalidPath(format!(
- "Failed to get file name from {:?}",
- obj_meta.location
- ))
- })?
- .to_string();
- let uri = join_url_segments(&prefix_url, &[&name])?.to_string();
- file_info.push(FileInfo {
- uri,
- name,
- size: obj_meta.size,
- });
+ .ok_or_else(|| InvalidPath(format!("Failed to get file name
from {location:?}")))?;
+ file_metadata.push(FileMetadata::new(name.to_string(),
obj_meta.size));
}
- Ok(file_info)
+ Ok(file_metadata)
}
}
@@ -349,54 +356,27 @@ mod tests {
)
.unwrap();
let storage = Storage::new_with_base_url(base_url).unwrap();
- let file_info_1: Vec<FileInfo> = storage
+ let file_info_1: Vec<FileMetadata> = storage
.list_files(None)
.await
.unwrap()
.into_iter()
.collect();
- assert_eq!(
- file_info_1,
- vec![FileInfo {
- uri: join_url_segments(&storage.base_url, &["a.parquet"])
- .unwrap()
- .to_string(),
- name: "a.parquet".to_string(),
- size: 0,
- }]
- );
- let file_info_2: Vec<FileInfo> = storage
+ assert_eq!(file_info_1, vec![FileMetadata::new("a.parquet", 0)]);
+ let file_info_2: Vec<FileMetadata> = storage
.list_files(Some("part1"))
.await
.unwrap()
.into_iter()
.collect();
- assert_eq!(
- file_info_2,
- vec![FileInfo {
- uri: join_url_segments(&storage.base_url, &["part1/b.parquet"])
- .unwrap()
- .to_string(),
- name: "b.parquet".to_string(),
- size: 0,
- }]
- );
- let file_info_3: Vec<FileInfo> = storage
+ assert_eq!(file_info_2, vec![FileMetadata::new("b.parquet", 0)],);
+ let file_info_3: Vec<FileMetadata> = storage
.list_files(Some("part2/part22"))
.await
.unwrap()
.into_iter()
.collect();
- assert_eq!(
- file_info_3,
- vec![FileInfo {
- uri: join_url_segments(&storage.base_url,
&["part2/part22/c.parquet"])
- .unwrap()
- .to_string(),
- name: "c.parquet".to_string(),
- size: 0,
- }]
- );
+ assert_eq!(file_info_3, vec![FileMetadata::new("c.parquet", 0)],);
}
#[tokio::test]
@@ -432,15 +412,12 @@ mod tests {
let base_url =
Url::from_directory_path(canonicalize(Path::new("tests/data")).unwrap()).unwrap();
let storage = Storage::new_with_base_url(base_url).unwrap();
- let file_info = storage.get_file_info("a.parquet").await.unwrap();
- assert_eq!(file_info.name, "a.parquet");
- assert_eq!(
- file_info.uri,
- join_url_segments(&storage.base_url, &["a.parquet"])
- .unwrap()
- .to_string()
- );
- assert_eq!(file_info.size, 866);
+ let file_metadata = storage
+ .get_file_metadata_not_populated("a.parquet")
+ .await
+ .unwrap();
+ assert_eq!(file_metadata.name, "a.parquet");
+ assert_eq!(file_metadata.size, 866);
}
#[tokio::test]
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index eef3150..4e6aa1d 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -21,12 +21,13 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::config::HudiConfigs;
-use crate::file_group::{BaseFile, FileGroup, FileSlice};
-use crate::storage::file_info::FileInfo;
+use crate::file_group::base_file::BaseFile;
+use crate::file_group::{FileGroup, FileSlice};
use crate::storage::{get_leaf_dirs, Storage};
use crate::config::read::HudiReadConfig::ListingParallelism;
use crate::error::CoreError;
+use crate::storage::file_metadata::FileMetadata;
use crate::table::partition::PartitionPruner;
use crate::Result;
use dashmap::DashMap;
@@ -91,7 +92,7 @@ impl FileSystemView {
storage: &Storage,
partition_path: &str,
) -> Result<Vec<FileGroup>> {
- let file_info: Vec<FileInfo> = storage
+ let file_metadata: Vec<FileMetadata> = storage
.list_files(Some(partition_path))
.await?
.into_iter()
@@ -99,8 +100,8 @@ impl FileSystemView {
.collect();
let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
- for f in file_info {
- let base_file = BaseFile::from_file_info(f)?;
+ for metadata in file_metadata {
+ let base_file = BaseFile::try_from(metadata)?;
let fg_id = &base_file.file_group_id;
fg_id_to_base_files
.entry(fg_id.to_owned())
@@ -163,7 +164,7 @@ impl FileSystemView {
continue;
}
if let Some(fsl) = fg.get_file_slice_mut_as_of(timestamp) {
- fsl.load_stats(&self.storage).await?;
+ fsl.load_metadata_if_needed(&self.storage).await?;
file_slices.push(fsl.clone());
}
}
@@ -260,7 +261,7 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a079bdb3-731c-4894-b855-abfcd6921007-0"]);
for fsl in file_slices.iter() {
- assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 4);
+
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 4);
}
}
@@ -289,7 +290,7 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["ebcb261d-62d3-4895-90ec-5b3c9622dff4-0"]);
for fsl in file_slices.iter() {
- assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 1);
+
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 1);
}
}
@@ -330,7 +331,7 @@ mod tests {
.collect::<Vec<_>>();
assert_eq!(fg_ids, vec!["a22e8257-e249-45e9-ba46-115bc85adcba-0"]);
for fsl in file_slices.iter() {
- assert_eq!(fsl.base_file.stats.as_ref().unwrap().num_records, 2);
+
assert_eq!(fsl.base_file.file_metadata.as_ref().unwrap().num_records, 2);
}
}
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 6d05c71..2c50fc7 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -61,6 +61,7 @@
//! use url::Url;
//! use hudi_core::table::Table;
//! use hudi_core::storage::util::parse_uri;
+//! use hudi_core::storage::util::join_url_segments;
//!
//! pub async fn test() {
//! let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
@@ -74,8 +75,8 @@
//! let file_group_vec = file_slice_vec
//! .iter()
//! .map(|f| {
-//! let url = parse_uri(&f.base_file.info.uri).unwrap();
-//! let size = f.base_file.info.size as u64;
+//! let relative_path =
f.base_file_relative_path().unwrap();
+//! let url = join_url_segments(&base_uri,
&[relative_path.as_str()]).unwrap();
//! url.path().to_string()
//! })
//! .collect();
@@ -372,8 +373,11 @@ mod tests {
/// Test helper to get relative file paths from the table with filters.
async fn get_file_paths_with_filters(table: &Table, filters: &[Filter]) ->
Result<Vec<String>> {
let mut file_paths = Vec::new();
+ let base_url = table.base_url()?;
for f in table.get_file_slices(filters).await? {
- file_paths.push(f.base_file_path().to_string());
+ let relative_path = f.base_file_relative_path()?;
+ let file_url = join_url_segments(&base_url,
&[relative_path.as_str()])?;
+ file_paths.push(file_url.to_string());
}
Ok(file_paths)
}
@@ -710,7 +714,7 @@ mod tests {
assert_eq!(
file_slices
.iter()
- .map(|f| f.base_file_relative_path())
+ .map(|f| f.base_file_relative_path().unwrap())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
);
@@ -724,7 +728,7 @@ mod tests {
assert_eq!(
file_slices
.iter()
- .map(|f| f.base_file_relative_path())
+ .map(|f| f.base_file_relative_path().unwrap())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",]
);
@@ -738,7 +742,7 @@ mod tests {
assert_eq!(
file_slices
.iter()
- .map(|f| f.base_file_relative_path())
+ .map(|f| f.base_file_relative_path().unwrap())
.collect::<Vec<_>>(),
vec!["a079bdb3-731c-4894-b855-abfcd6921007-0_0-182-253_20240418173550988.parquet",]
);
@@ -752,7 +756,7 @@ mod tests {
assert_eq!(
file_slices
.iter()
- .map(|f| f.base_file_relative_path())
+ .map(|f| f.base_file_relative_path().unwrap())
.collect::<Vec<_>>(),
Vec::<String>::new()
);
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 718dc61..0c27e74 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -45,7 +45,7 @@ use datafusion_physical_expr::create_physical_expr;
use crate::util::expr::exprs_to_filters;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_core::config::util::empty_options;
-use hudi_core::storage::util::{get_scheme_authority, parse_uri};
+use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
use hudi_core::table::Table as HudiTable;
/// Create a `HudiDataSource`.
@@ -181,16 +181,26 @@ impl TableProvider for HudiDataSource {
.get_file_slices_splits(self.get_input_partitions(),
pushdown_filters.as_slice())
.await
.map_err(|e| Execution(format!("Failed to get file slices from
Hudi table: {}", e)))?;
+ let base_url = self.table.base_url().map_err(|e| {
+ Execution(format!(
+ "Failed to get base path config from Hudi table: {e:?}"
+ ))
+ })?;
let mut parquet_file_groups: Vec<Vec<PartitionedFile>> = Vec::new();
for file_slice_vec in file_slices {
- let parquet_file_group_vec = file_slice_vec
- .iter()
- .map(|f| {
- let url = parse_uri(&f.base_file.info.uri).unwrap();
- let size = f.base_file.info.size as u64;
- PartitionedFile::new(url.path(), size)
- })
- .collect();
+ let mut parquet_file_group_vec = Vec::new();
+ for f in file_slice_vec {
+ let relative_path = f.base_file_relative_path().map_err(|e| {
+ Execution(format!(
+ "Failed to get base file relative path for {f:?} due
to {e:?}"
+ ))
+ })?;
+ let url = join_url_segments(&base_url,
&[relative_path.as_str()])
+ .map_err(|e| Execution(format!("Failed to join URL
segments: {e:?}")))?;
+ let size = f.base_file.file_metadata.as_ref().map_or(0, |m|
m.size);
+ let partitioned_file = PartitionedFile::new(url.path(), size
as u64);
+ parquet_file_group_vec.push(partitioned_file);
+ }
parquet_file_groups.push(parquet_file_group_vec)
}
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 4235d17..d5e046f 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -60,22 +60,22 @@ class HudiFileSlice:
the partition it belongs to, and associated metadata.
Attributes:
- file_group_id (str): The ID of the file group this slice belongs to.
+ file_group_id (str): The id of the file group this file slice belongs
to.
partition_path (str): The path of the partition containing this file
slice.
- commit_time (str): The commit time of this file slice.
+ creation_instant_time (str): The creation instant time of this file
slice.
base_file_name (str): The name of the base file.
- base_file_size (int): The size of the base file.
- num_records (int): The number of records in the base file.
- size_bytes (int): The size of the file slice in bytes.
+ base_file_size (int): The on-disk size of the base file in bytes.
+ base_file_byte_size (int): The in-memory size of the base file in
bytes.
+ num_records (int): The number of records in the file slice.
"""
file_group_id: str
partition_path: str
- commit_time: str
+ creation_instant_time: str
base_file_name: str
base_file_size: int
+ base_file_byte_size: int
num_records: int
- size_bytes: int
def base_file_relative_path(self) -> str:
"""
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 1aef081..4c9b7fd 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -94,15 +94,15 @@ pub struct HudiFileSlice {
#[pyo3(get)]
partition_path: String,
#[pyo3(get)]
- commit_time: String,
+ creation_instant_time: String,
#[pyo3(get)]
base_file_name: String,
#[pyo3(get)]
base_file_size: usize,
#[pyo3(get)]
- num_records: i64,
+ base_file_byte_size: i64,
#[pyo3(get)]
- size_bytes: i64,
+ num_records: i64,
}
#[cfg(not(tarpaulin))]
@@ -126,21 +126,21 @@ impl HudiFileSlice {
#[cfg(not(tarpaulin))]
fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
let file_group_id = f.file_group_id().to_string();
- let partition_path =
f.partition_path.as_deref().unwrap_or_default().to_string();
- let commit_time = f.base_file.commit_time.to_string();
- let base_file_name = f.base_file.info.name.clone();
- let base_file_size = f.base_file.info.size;
- let stats = f.base_file.stats.clone().unwrap_or_default();
- let num_records = stats.num_records;
- let size_bytes = stats.size_bytes;
+ let partition_path = f.partition_path().to_string();
+ let creation_instant_time = f.creation_instant_time().to_string();
+ let base_file_name = f.base_file.file_name.clone();
+ let file_metadata = f.base_file.file_metadata.clone().unwrap_or_default();
+ let base_file_size = file_metadata.size;
+ let base_file_byte_size = file_metadata.byte_size;
+ let num_records = file_metadata.num_records;
HudiFileSlice {
file_group_id,
partition_path,
- commit_time,
+ creation_instant_time,
base_file_name,
base_file_size,
+ base_file_byte_size,
num_records,
- size_bytes,
}
}
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index f8986a8..c35be59 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -51,12 +51,12 @@ def
test_read_table_returns_correct_file_slices(get_sample_table):
file_slices = table.get_file_slices()
assert len(file_slices) == 5
- assert set(f.commit_time for f in file_slices) == {
+ assert set(f.creation_instant_time for f in file_slices) == {
"20240402123035233",
"20240402144910683",
}
assert all(f.num_records == 1 for f in file_slices)
- assert all(f.size_bytes > 0 for f in file_slices)
+ assert all(f.base_file_byte_size > 0 for f in file_slices)
file_slice_paths = [f.base_file_relative_path() for f in file_slices]
assert set(file_slice_paths) == {
"chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet",