This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d5f42f6 feat: add `HudiFileGroupReader` with consolidated APIs to
read records (#164)
d5f42f6 is described below
commit d5f42f68d42cd0469406e3f8307076a6f9a3b49a
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Oct 12 01:02:38 2024 -1000
feat: add `HudiFileGroupReader` with consolidated APIs to read records
(#164)
- Add FileGroupReader with APIs to read file slices
- Migrate existing APIs to use FileGroupReader
---
crates/core/src/config/mod.rs | 21 +++-----
crates/core/src/config/utils.rs | 4 +-
crates/core/src/file_group/mod.rs | 33 ++++++++++++-
crates/core/src/file_group/reader.rs | 93 ++++++++++++++++++++++++++++++++++++
crates/core/src/storage/mod.rs | 19 ++++++++
crates/core/src/table/fs_view.rs | 13 -----
crates/core/src/table/mod.rs | 35 ++++----------
python/hudi/__init__.py | 1 +
python/hudi/_internal.pyi | 9 +++-
python/src/internal.rs | 34 +++++++++++--
python/src/lib.rs | 3 +-
python/tests/test_table_read.py | 4 +-
12 files changed, 207 insertions(+), 62 deletions(-)
diff --git a/crates/core/src/config/mod.rs b/crates/core/src/config/mod.rs
index 580be44..be6136f 100644
--- a/crates/core/src/config/mod.rs
+++ b/crates/core/src/config/mod.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
use crate::storage::utils::parse_uri;
use anyhow::Result;
+use serde::{Deserialize, Serialize};
use url::Url;
pub mod internal;
@@ -157,30 +158,22 @@ impl From<HudiConfigValue> for Vec<String> {
}
/// Hudi configuration container.
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct HudiConfigs {
raw_options: Arc<HashMap<String, String>>,
}
-impl From<HashMap<String, String>> for HudiConfigs {
- fn from(options: HashMap<String, String>) -> Self {
- Self {
- raw_options: Arc::new(options),
- }
- }
-}
-
impl HudiConfigs {
/// Create [HudiConfigs] using opitons in the form of key-value pairs.
pub fn new<I, K, V>(options: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
- V: AsRef<str>,
+ V: Into<String>,
{
let raw_options = options
.into_iter()
- .map(|(k, v)| (k.as_ref().to_string(), v.as_ref().to_string()))
+ .map(|(k, v)| (k.as_ref().into(), v.into()))
.collect();
Self {
raw_options: Arc::new(raw_options),
@@ -242,12 +235,12 @@ mod tests {
use super::*;
#[test]
- fn test_from_hashmap() {
+ fn test_new_using_hashmap() {
let mut options = HashMap::new();
options.insert("key1".to_string(), "value1".to_string());
options.insert("key2".to_string(), "value2".to_string());
- let config = HudiConfigs::from(options.clone());
+ let config = HudiConfigs::new(options.clone());
assert_eq!(*config.raw_options, options);
}
@@ -281,7 +274,7 @@ mod tests {
options.insert("key1".to_string(), "value1".to_string());
options.insert("key2".to_string(), "value2".to_string());
- let config = HudiConfigs::from(options.clone());
+ let config = HudiConfigs::new(options.clone());
let result = config.as_options();
diff --git a/crates/core/src/config/utils.rs b/crates/core/src/config/utils.rs
index 5fd84cd..98ea6b1 100644
--- a/crates/core/src/config/utils.rs
+++ b/crates/core/src/config/utils.rs
@@ -38,9 +38,9 @@ where
let mut others = HashMap::new();
for (k, v) in all_options {
if k.as_ref().starts_with("hoodie.") {
- hudi_options.insert(k.as_ref().to_string(), v.into());
+ hudi_options.insert(k.as_ref().into(), v.into());
} else {
- others.insert(k.as_ref().to_string(), v.into());
+ others.insert(k.as_ref().into(), v.into());
}
}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index 4605052..6cd1248 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,6 +20,8 @@
//!
//! A set of data/base files + set of log files, that make up a unit for all
operations.
+pub mod reader;
+
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
@@ -223,7 +225,7 @@ impl FileGroup {
#[cfg(test)]
mod tests {
- use crate::file_group::{BaseFile, FileGroup};
+ use super::*;
#[test]
fn create_a_base_file_successfully() {
@@ -274,4 +276,33 @@ mod tests {
assert!(res2.is_err());
assert_eq!(res2.unwrap_err().to_string(), "Commit time
20240402144910683 is already present in File Group
5a226868-2934-4f84-a16f-55124630c68d-0");
}
+
+ #[test]
+ fn test_file_group_display() {
+ let file_group = FileGroup {
+ id: "group123".to_string(),
+ partition_path: Some("part/2023-01-01".to_string()),
+ file_slices: BTreeMap::new(),
+ };
+
+ let display_string = format!("{}", file_group);
+
+ assert_eq!(
+ display_string,
+ "File Group: partition Some(\"part/2023-01-01\") id group123"
+ );
+
+ let file_group_no_partition = FileGroup {
+ id: "group456".to_string(),
+ partition_path: None,
+ file_slices: BTreeMap::new(),
+ };
+
+ let display_string_no_partition = format!("{}",
file_group_no_partition);
+
+ assert_eq!(
+ display_string_no_partition,
+ "File Group: partition None id group456"
+ );
+ }
}
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
new file mode 100644
index 0000000..0c97dd5
--- /dev/null
+++ b/crates/core/src/file_group/reader.rs
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::config::table::HudiTableConfig;
+use crate::config::utils::split_hudi_options_from_others;
+use crate::config::HudiConfigs;
+use crate::file_group::FileSlice;
+use crate::storage::Storage;
+use anyhow::Result;
+use arrow_array::RecordBatch;
+use std::sync::Arc;
+
+#[derive(Clone, Debug)]
+pub struct FileGroupReader {
+ storage: Arc<Storage>,
+}
+
+impl FileGroupReader {
+ pub fn new(storage: Arc<Storage>) -> Self {
+ Self { storage }
+ }
+
+ pub fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ let (mut hudi_opts, others) = split_hudi_options_from_others(options);
+ hudi_opts.insert(
+ HudiTableConfig::BasePath.as_ref().to_string(),
+ base_uri.to_string(),
+ );
+
+ let hudi_configs = Arc::new(HudiConfigs::new(hudi_opts));
+
+ let storage = Storage::new(Arc::new(others), hudi_configs)?;
+ Ok(Self { storage })
+ }
+
+ pub async fn read_file_slice_by_base_file_path(
+ &self,
+ relative_path: &str,
+ ) -> Result<RecordBatch> {
+ self.storage.get_parquet_file_data(relative_path).await
+ }
+
+ pub async fn read_file_slice(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
+
self.read_file_slice_by_base_file_path(&file_slice.base_file_relative_path())
+ .await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use url::Url;
+
+ #[test]
+ fn test_new() {
+ let base_url = Url::parse("file:///tmp/hudi_data").unwrap();
+ let storage = Storage::new_with_base_url(base_url).unwrap();
+ let fg_reader = FileGroupReader::new(storage.clone());
+ assert!(Arc::ptr_eq(&fg_reader.storage, &storage));
+ }
+
+ #[test]
+ fn test_new_with_options() -> Result<()> {
+ let options = vec![("key1", "value1"), ("key2", "value2")];
+ let reader = FileGroupReader::new_with_options("/tmp/hudi_data",
options)?;
+ assert!(!reader.storage.options.is_empty());
+ assert!(reader
+ .storage
+ .hudi_configs
+ .contains(HudiTableConfig::BasePath));
+ Ok(())
+ }
+}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 45c0699..2b4c118 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -278,6 +278,25 @@ mod tests {
.contains("Failed to create storage"));
}
+ #[test]
+ fn test_storage_new_error_invalid_url() {
+ let options = Arc::new(HashMap::new());
+ let hudi_configs = Arc::new(HudiConfigs::new([(
+ HudiTableConfig::BasePath,
+ "http://invalid_url",
+ )]));
+ let result = Storage::new(options, hudi_configs);
+
+ assert!(
+ result.is_err(),
+ "Should return error when no base path is invalid."
+ );
+ assert!(result
+ .unwrap_err()
+ .to_string()
+ .contains("Failed to create storage"));
+ }
+
#[tokio::test]
async fn storage_list_dirs() {
let base_url = Url::from_directory_path(
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 0cc6ad1..2278d6c 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -26,7 +26,6 @@ use crate::storage::file_info::FileInfo;
use crate::storage::{get_leaf_dirs, Storage};
use crate::table::partition::PartitionPruner;
use anyhow::Result;
-use arrow::record_batch::RecordBatch;
use dashmap::DashMap;
use futures::stream::{self, StreamExt, TryStreamExt};
@@ -171,18 +170,6 @@ impl FileSystemView {
}
Ok(file_slices)
}
-
- pub async fn read_file_slice_by_path_unchecked(
- &self,
- relative_path: &str,
- ) -> Result<RecordBatch> {
- self.storage.get_parquet_file_data(relative_path).await
- }
-
- pub async fn read_file_slice_unchecked(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
-
self.read_file_slice_by_path_unchecked(&file_slice.base_file_relative_path())
- .await
- }
}
#[cfg(test)]
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 9ca5817..fa694e4 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -95,6 +95,7 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::{Field, Schema};
use strum::IntoEnumIterator;
use url::Url;
+
use HudiInternalConfig::SkipConfigValidation;
use HudiTableConfig::{DropsPartitionFields, TableType, TableVersion};
use TableTypeValue::CopyOnWrite;
@@ -108,6 +109,7 @@ use crate::config::utils::parse_data_for_options;
use crate::config::utils::{empty_options, split_hudi_options_from_others};
use crate::config::HudiConfigs;
use crate::config::HUDI_CONF_DIR;
+use crate::file_group::reader::FileGroupReader;
use crate::file_group::FileSlice;
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
@@ -198,7 +200,7 @@ impl Table {
other_options.extend(others);
hudi_options.insert(
- HudiTableConfig::BasePath.as_ref().to_string(),
+ HudiTableConfig::BasePath.as_ref().into(),
base_uri.to_string(),
);
@@ -404,8 +406,9 @@ impl Table {
.await
.context(format!("Failed to get file slices as of {}",
timestamp))?;
let mut batches = Vec::new();
+ let fg_reader = self.create_file_group_reader();
for f in file_slices {
- match self.file_system_view.read_file_slice_unchecked(&f).await {
+ match fg_reader.read_file_slice(&f).await {
Ok(batch) => batches.push(batch),
Err(e) => return Err(anyhow!("Failed to read file slice {:?} -
{}", f, e)),
}
@@ -422,29 +425,8 @@ impl Table {
Ok(file_paths)
}
- /// Read records from a [FileSlice] by its relative path.
- ///
- /// **Example**
- ///
- /// ```rust
- /// use url::Url;
- /// use hudi_core::table::Table;
- ///
- /// pub async fn test() {
- /// let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
- /// let hudi_table = Table::new(base_uri.path()).await.unwrap();
- /// let batches = hudi_table
- /// .read_file_slice_by_path(
- ///
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
- /// )
- /// .await
- /// .unwrap();
- /// }
- /// ```
- pub async fn read_file_slice_by_path(&self, relative_path: &str) ->
Result<RecordBatch> {
- self.file_system_view
- .read_file_slice_by_path_unchecked(relative_path)
- .await
+ pub fn create_file_group_reader(&self) -> FileGroupReader {
+ FileGroupReader::new(self.file_system_view.storage.clone())
}
}
@@ -730,7 +712,8 @@ mod tests {
let base_url = TestTable::V6Nonpartitioned.url();
let hudi_table = Table::new(base_url.path()).await.unwrap();
let batches = hudi_table
- .read_file_slice_by_path(
+ .create_file_group_reader()
+ .read_file_slice_by_base_file_path(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
)
.await
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index b0a792e..37738c7 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+from ._internal import HudiFileGroupReader as HudiFileGroupReader
from ._internal import HudiFileSlice as HudiFileSlice
from ._internal import HudiTable as HudiTable
from ._internal import __version__ as __version__
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 47da6aa..e0acc99 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -21,6 +21,13 @@ import pyarrow # type: ignore
__version__: str
+@dataclass(init=False)
+class HudiFileGroupReader:
+ def __init__(self, base_uri: str, options: Optional[Dict[str, str]] =
None): ...
+ def read_file_slice_by_base_file_path(
+ self, relative_path: str
+ ) -> "pyarrow.RecordBatch": ...
+
@dataclass(init=False)
class HudiFileSlice:
file_group_id: str
@@ -46,7 +53,7 @@ class HudiTable:
self, n: int, filters: Optional[List[str]]
) -> List[List[HudiFileSlice]]: ...
def get_file_slices(self, filters: Optional[List[str]]) ->
List[HudiFileSlice]: ...
- def read_file_slice(self, base_file_relative_path: str) ->
pyarrow.RecordBatch: ...
+ def create_file_group_reader(self) -> HudiFileGroupReader: ...
def read_snapshot(
self, filters: Optional[List[str]]
) -> List["pyarrow.RecordBatch"]: ...
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 4c448b5..f303134 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -25,6 +25,7 @@ use arrow::pyarrow::ToPyArrow;
use pyo3::{pyclass, pymethods, PyErr, PyObject, PyResult, Python};
use tokio::runtime::Runtime;
+use hudi::file_group::reader::FileGroupReader;
use hudi::file_group::FileSlice;
use hudi::table::Table;
@@ -34,6 +35,33 @@ macro_rules! vec_string_to_slice {
};
}
+#[cfg(not(tarpaulin))]
+#[derive(Clone, Debug)]
+#[pyclass]
+pub struct HudiFileGroupReader {
+ inner: FileGroupReader,
+}
+
+#[cfg(not(tarpaulin))]
+#[pymethods]
+impl HudiFileGroupReader {
+ #[new]
+ #[pyo3(signature = (base_uri, options=None))]
+ fn new(base_uri: &str, options: Option<HashMap<String, String>>) ->
PyResult<Self> {
+ let inner = FileGroupReader::new_with_options(base_uri,
options.unwrap_or_default())?;
+ Ok(HudiFileGroupReader { inner })
+ }
+
+ fn read_file_slice_by_base_file_path(
+ &self,
+ relative_path: &str,
+ py: Python,
+ ) -> PyResult<PyObject> {
+
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))?
+ .to_pyarrow(py)
+ }
+}
+
#[cfg(not(tarpaulin))]
#[derive(Clone, Debug)]
#[pyclass]
@@ -156,9 +184,9 @@ impl HudiTable {
})
}
- fn read_file_slice(&self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
- rt().block_on(self.inner.read_file_slice_by_path(relative_path))?
- .to_pyarrow(py)
+ fn create_file_group_reader(&self) -> PyResult<HudiFileGroupReader> {
+ let fg_reader = self.inner.create_file_group_reader();
+ Ok(HudiFileGroupReader { inner: fg_reader })
}
#[pyo3(signature = (filters=None))]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index ad96dc6..817d936 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -25,7 +25,8 @@ mod internal;
fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
- use internal::{HudiFileSlice, HudiTable};
+ use internal::{HudiFileGroupReader, HudiFileSlice, HudiTable};
+ m.add_class::<HudiFileGroupReader>()?;
m.add_class::<HudiFileSlice>()?;
m.add_class::<HudiTable>()?;
Ok(())
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 2d1fd74..2b30a92 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -66,7 +66,9 @@ def test_sample_table(get_sample_table):
"sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet",
}
- batch = table.read_file_slice(file_slice_paths[0])
+ batch = table.create_file_group_reader().read_file_slice_by_base_file_path(
+ file_slice_paths[0]
+ )
t = pa.Table.from_batches([batch])
assert t.num_rows == 1
assert t.num_columns == 11