This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d5f42f6  feat: add `HudiFileGroupReader` with consolidated APIs to 
read records (#164)
d5f42f6 is described below

commit d5f42f68d42cd0469406e3f8307076a6f9a3b49a
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Oct 12 01:02:38 2024 -1000

    feat: add `HudiFileGroupReader` with consolidated APIs to read records 
(#164)
    
    - Add FileGroupReader with APIs to read file slices
    - Migrate existing APIs to use FileGroupReader
---
 crates/core/src/config/mod.rs        | 21 +++-----
 crates/core/src/config/utils.rs      |  4 +-
 crates/core/src/file_group/mod.rs    | 33 ++++++++++++-
 crates/core/src/file_group/reader.rs | 93 ++++++++++++++++++++++++++++++++++++
 crates/core/src/storage/mod.rs       | 19 ++++++++
 crates/core/src/table/fs_view.rs     | 13 -----
 crates/core/src/table/mod.rs         | 35 ++++----------
 python/hudi/__init__.py              |  1 +
 python/hudi/_internal.pyi            |  9 +++-
 python/src/internal.rs               | 34 +++++++++++--
 python/src/lib.rs                    |  3 +-
 python/tests/test_table_read.py      |  4 +-
 12 files changed, 207 insertions(+), 62 deletions(-)

diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 580be44..be6136f 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
 
 use crate::storage::utils::parse_uri;
 use anyhow::Result;
+use serde::{Deserialize, Serialize};
 use url::Url;
 
 pub mod internal;
@@ -157,30 +158,22 @@ impl From<HudiConfigValue> for Vec<String> {
 }
 
 /// Hudi configuration container.
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
 pub struct HudiConfigs {
     raw_options: Arc<HashMap<String, String>>,
 }
 
-impl From<HashMap<String, String>> for HudiConfigs {
-    fn from(options: HashMap<String, String>) -> Self {
-        Self {
-            raw_options: Arc::new(options),
-        }
-    }
-}
-
 impl HudiConfigs {
     /// Create [HudiConfigs] using opitons in the form of key-value pairs.
     pub fn new<I, K, V>(options: I) -> Self
     where
         I: IntoIterator<Item = (K, V)>,
         K: AsRef<str>,
-        V: AsRef<str>,
+        V: Into<String>,
     {
         let raw_options = options
             .into_iter()
-            .map(|(k, v)| (k.as_ref().to_string(), v.as_ref().to_string()))
+            .map(|(k, v)| (k.as_ref().into(), v.into()))
             .collect();
         Self {
             raw_options: Arc::new(raw_options),
@@ -242,12 +235,12 @@ mod tests {
     use super::*;
 
     #[test]
-    fn test_from_hashmap() {
+    fn test_new_using_hashmap() {
         let mut options = HashMap::new();
         options.insert("key1".to_string(), "value1".to_string());
         options.insert("key2".to_string(), "value2".to_string());
 
-        let config = HudiConfigs::from(options.clone());
+        let config = HudiConfigs::new(options.clone());
 
         assert_eq!(*config.raw_options, options);
     }
@@ -281,7 +274,7 @@ mod tests {
         options.insert("key1".to_string(), "value1".to_string());
         options.insert("key2".to_string(), "value2".to_string());
 
-        let config = HudiConfigs::from(options.clone());
+        let config = HudiConfigs::new(options.clone());
 
         let result = config.as_options();
 
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs
index 5fd84cd..98ea6b1 100644
--- a/crates/core/src/config/utils.rs
+++ b/crates/core/src/config/utils.rs
@@ -38,9 +38,9 @@ where
     let mut others = HashMap::new();
     for (k, v) in all_options {
         if k.as_ref().starts_with("hoodie.") {
-            hudi_options.insert(k.as_ref().to_string(), v.into());
+            hudi_options.insert(k.as_ref().into(), v.into());
         } else {
-            others.insert(k.as_ref().to_string(), v.into());
+            others.insert(k.as_ref().into(), v.into());
         }
     }
 
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 4605052..6cd1248 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,6 +20,8 @@
 //!
 //! A set of data/base files + set of log files, that make up a unit for all 
operations.
 
+pub mod reader;
+
 use std::collections::BTreeMap;
 use std::fmt;
 use std::fmt::Formatter;
@@ -223,7 +225,7 @@ impl FileGroup {
 
 #[cfg(test)]
 mod tests {
-    use crate::file_group::{BaseFile, FileGroup};
+    use super::*;
 
     #[test]
     fn create_a_base_file_successfully() {
@@ -274,4 +276,33 @@ mod tests {
         assert!(res2.is_err());
         assert_eq!(res2.unwrap_err().to_string(), "Commit time 
20240402144910683 is already present in File Group 
5a226868-2934-4f84-a16f-55124630c68d-0");
     }
+
+    #[test]
+    fn test_file_group_display() {
+        let file_group = FileGroup {
+            id: "group123".to_string(),
+            partition_path: Some("part/2023-01-01".to_string()),
+            file_slices: BTreeMap::new(),
+        };
+
+        let display_string = format!("{}", file_group);
+
+        assert_eq!(
+            display_string,
+            "File Group: partition Some(\"part/2023-01-01\") id group123"
+        );
+
+        let file_group_no_partition = FileGroup {
+            id: "group456".to_string(),
+            partition_path: None,
+            file_slices: BTreeMap::new(),
+        };
+
+        let display_string_no_partition = format!("{}", 
file_group_no_partition);
+
+        assert_eq!(
+            display_string_no_partition,
+            "File Group: partition None id group456"
+        );
+    }
 }
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
new file mode 100644
index 0000000..0c97dd5
--- /dev/null
+++ b/crates/core/src/file_group/reader.rs
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::table::HudiTableConfig;
+use crate::config::utils::split_hudi_options_from_others;
+use crate::config::HudiConfigs;
+use crate::file_group::FileSlice;
+use crate::storage::Storage;
+use anyhow::Result;
+use arrow_array::RecordBatch;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct FileGroupReader {
+    storage: Arc<Storage>,
+}
+
+impl FileGroupReader {
+    pub fn new(storage: Arc<Storage>) -> Self {
+        Self { storage }
+    }
+
+    pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
+        let (mut hudi_opts, others) = split_hudi_options_from_others(options);
+        hudi_opts.insert(
+            HudiTableConfig::BasePath.as_ref().to_string(),
+            base_uri.to_string(),
+        );
+
+        let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
+
+        let storage = Storage::new(Arc::new(others), hudi_configs)?;
+        Ok(Self { storage })
+    }
+
+    pub async fn read_file_slice_by_base_file_path(
+        &self,
+        relative_path: &str,
+    ) -> Result<RecordBatch> {
+        self.storage.get_parquet_file_data(relative_path).await
+    }
+
+    pub async fn read_file_slice(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
+        
self.read_file_slice_by_base_file_path(&file_slice.base_file_relative_path())
+            .await
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use url::Url;
+
+    #[test]
+    fn test_new() {
+        let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
+        let storage = Storage::new_with_base_url(base_url).unwrap();
+        let fg_reader = FileGroupReader::new(storage.clone());
+        assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
+    }
+
+    #[test]
+    fn test_new_with_options() -> Result<()> {
+        let options = vec![("key1", "value1"), ("key2", "value2")];
+        let reader = FileGroupReader::new_with_options("/tmp/hudi_data", 
options)?;
+        assert!(!reader.storage.options.is_empty());
+        assert!(reader
+            .storage
+            .hudi_configs
+            .contains(HudiTableConfig::BasePath));
+        Ok(())
+    }
+}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 45c0699..2b4c118 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -278,6 +278,25 @@ mod tests {
             .contains("Failed to create storage"));
     }
 
+    #[test]
+    fn test_storage_new_error_invalid_url() {
+        let options = Arc::new(HashMap::new());
+        let hudi_configs = Arc::new(HudiConfigs::new([(
+            HudiTableConfig::BasePath,
+            "http://invalid_url";,
+        )]));
+        let result = Storage::new(options, hudi_configs);
+
+        assert!(
+            result.is_err(),
+            "Should return error when no base path is invalid."
+        );
+        assert!(result
+            .unwrap_err()
+            .to_string()
+            .contains("Failed to create storage"));
+    }
+
     #[tokio::test]
     async fn storage_list_dirs() {
         let base_url = Url::from_directory_path(
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 0cc6ad1..2278d6c 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -26,7 +26,6 @@ use crate::storage::file_info::FileInfo;
 use crate::storage::{get_leaf_dirs, Storage};
 use crate::table::partition::PartitionPruner;
 use anyhow::Result;
-use arrow::record_batch::RecordBatch;
 use dashmap::DashMap;
 use futures::stream::{self, StreamExt, TryStreamExt};
 
@@ -171,18 +170,6 @@ impl FileSystemView {
         }
         Ok(file_slices)
     }
-
-    pub async fn read_file_slice_by_path_unchecked(
-        &self,
-        relative_path: &str,
-    ) -> Result<RecordBatch> {
-        self.storage.get_parquet_file_data(relative_path).await
-    }
-
-    pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
-        
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
-            .await
-    }
 }
 
 #[cfg(test)]
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 9ca5817..fa694e4 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -95,6 +95,7 @@ use arrow::record_batch::RecordBatch;
 use arrow_schema::{Field, Schema};
 use strum::IntoEnumIterator;
 use url::Url;
+
 use HudiInternalConfig::SkipConfigValidation;
 use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
 use TableTypeValue::CopyOnWrite;
@@ -108,6 +109,7 @@ use crate::config::utils::parse_data_for_options;
 use crate::config::utils::{empty_options, split_hudi_options_from_others};
 use crate::config::HudiConfigs;
 use crate::config::HUDI_CONF_DIR;
+use crate::file_group::reader::FileGroupReader;
 use crate::file_group::FileSlice;
 use crate::storage::Storage;
 use crate::table::fs_view::FileSystemView;
@@ -198,7 +200,7 @@ impl Table {
         other_options.extend(others);
 
         hudi_options.insert(
-            HudiTableConfig::BasePath.as_ref().to_string(),
+            HudiTableConfig::BasePath.as_ref().into(),
             base_uri.to_string(),
         );
 
@@ -404,8 +406,9 @@ impl Table {
             .await
             .context(format!("Failed to get file slices as of {}", 
timestamp))?;
         let mut batches = Vec::new();
+        let fg_reader = self.create_file_group_reader();
         for f in file_slices {
-            match self.file_system_view.read_file_slice_unchecked(&f).await {
+            match fg_reader.read_file_slice(&f).await {
                 Ok(batch) => batches.push(batch),
                 Err(e) => return Err(anyhow!("Failed to read file slice {:?} - 
{}", f, e)),
             }
@@ -422,29 +425,8 @@ impl Table {
         Ok(file_paths)
     }
 
-    /// Read records from a [FileSlice] by its relative path.
-    ///
-    /// **Example**
-    ///
-    /// ```rust
-    /// use url::Url;
-    /// use hudi_core::table::Table;
-    ///
-    /// pub async fn test() {
-    ///     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
-    ///     let hudi_table = Table::new(base_uri.path()).await.unwrap();
-    ///     let batches = hudi_table
-    ///         .read_file_slice_by_path(
-    ///             
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
-    ///         )
-    ///         .await
-    ///         .unwrap();
-    /// }
-    /// ```
-    pub async fn read_file_slice_by_path(&self, relative_path: &str) -> 
Result<RecordBatch> {
-        self.file_system_view
-            .read_file_slice_by_path_unchecked(relative_path)
-            .await
+    pub fn create_file_group_reader(&self) -> FileGroupReader {
+        FileGroupReader::new(self.file_system_view.storage.clone())
     }
 }
 
@@ -730,7 +712,8 @@ mod tests {
         let base_url = TestTable::V6Nonpartitioned.url();
         let hudi_table = Table::new(base_url.path()).await.unwrap();
         let batches = hudi_table
-            .read_file_slice_by_path(
+            .create_file_group_reader()
+            .read_file_slice_by_base_file_path(
                 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
             )
             .await
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index b0a792e..37738c7 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -15,6 +15,7 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
+from ._internal import HudiFileGroupReader as HudiFileGroupReader
 from ._internal import HudiFileSlice as HudiFileSlice
 from ._internal import HudiTable as HudiTable
 from ._internal import __version__ as __version__
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 47da6aa..e0acc99 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -21,6 +21,13 @@ import pyarrow  # type: ignore
 
 __version__: str
 
+@dataclass(init=False)
+class HudiFileGroupReader:
+    def __init__(self, base_uri: str, options: Optional[Dict[str, str]] = 
None): ...
+    def read_file_slice_by_base_file_path(
+        self, relative_path: str
+    ) -> "pyarrow.RecordBatch": ...
+
 @dataclass(init=False)
 class HudiFileSlice:
     file_group_id: str
@@ -46,7 +53,7 @@ class HudiTable:
         self, n: int, filters: Optional[List[str]]
     ) -> List[List[HudiFileSlice]]: ...
     def get_file_slices(self, filters: Optional[List[str]]) -> 
List[HudiFileSlice]: ...
-    def read_file_slice(self, base_file_relative_path: str) -> 
pyarrow.RecordBatch: ...
+    def create_file_group_reader(self) -> HudiFileGroupReader: ...
     def read_snapshot(
         self, filters: Optional[List[str]]
     ) -> List["pyarrow.RecordBatch"]: ...
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 4c448b5..f303134 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -25,6 +25,7 @@ use arrow::pyarrow::ToPyArrow;
 use pyo3::{pyclass, pymethods, PyErr, PyObject, PyResult, Python};
 use tokio::runtime::Runtime;
 
+use hudi::file_group::reader::FileGroupReader;
 use hudi::file_group::FileSlice;
 use hudi::table::Table;
 
@@ -34,6 +35,33 @@ macro_rules! vec_string_to_slice {
     };
 }
 
+#[cfg(not(tarpaulin))]
+#[derive(Clone, Debug)]
+#[pyclass]
+pub struct HudiFileGroupReader {
+    inner: FileGroupReader,
+}
+
+#[cfg(not(tarpaulin))]
+#[pymethods]
+impl HudiFileGroupReader {
+    #[new]
+    #[pyo3(signature = (base_uri, options=None))]
+    fn new(base_uri: &str, options: Option<HashMap<String, String>>) -> 
PyResult<Self> {
+        let inner = FileGroupReader::new_with_options(base_uri, 
options.unwrap_or_default())?;
+        Ok(HudiFileGroupReader { inner })
+    }
+
+    fn read_file_slice_by_base_file_path(
+        &self,
+        relative_path: &str,
+        py: Python,
+    ) -> PyResult<PyObject> {
+        
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))?
+            .to_pyarrow(py)
+    }
+}
+
 #[cfg(not(tarpaulin))]
 #[derive(Clone, Debug)]
 #[pyclass]
@@ -156,9 +184,9 @@ impl HudiTable {
         })
     }
 
-    fn read_file_slice(&self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
-        rt().block_on(self.inner.read_file_slice_by_path(relative_path))?
-            .to_pyarrow(py)
+    fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
+        let fg_reader = self.inner.create_file_group_reader();
+        Ok(HudiFileGroupReader { inner: fg_reader })
     }
 
     #[pyo3(signature = (filters=None))]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index ad96dc6..817d936 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -25,7 +25,8 @@ mod internal;
 fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
     m.add("__version__", env!("CARGO_PKG_VERSION"))?;
 
-    use internal::{HudiFileSlice, HudiTable};
+    use internal::{HudiFileGroupReader, HudiFileSlice, HudiTable};
+    m.add_class::<HudiFileGroupReader>()?;
     m.add_class::<HudiFileSlice>()?;
     m.add_class::<HudiTable>()?;
     Ok(())
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 2d1fd74..2b30a92 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -66,7 +66,9 @@ def test_sample_table(get_sample_table):
         
"sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet",
     }
 
-    batch = table.read_file_slice(file_slice_paths[0])
+    batch = table.create_file_group_reader().read_file_slice_by_base_file_path(
+        file_slice_paths[0]
+    )
     t = pa.Table.from_batches([batch])
     assert t.num_rows == 1
     assert t.num_columns == 11

Reply via email to