This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e15174d feat: implement Rust and Python APIs to read file slices (#28)
e15174d is described below
commit e15174d505ab22407007fdc009adce0d5fd6cb7d
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jun 26 23:29:02 2024 -0500
feat: implement Rust and Python APIs to read file slices (#28)
---
Cargo.toml | 2 +-
crates/core/Cargo.toml | 1 +
crates/core/src/file_group/mod.rs | 65 ++++++---
crates/core/src/lib.rs | 2 +-
crates/core/src/{lib.rs => storage/file_info.rs} | 16 +-
crates/core/src/{lib.rs => storage/file_stats.rs} | 14 +-
crates/core/src/storage/mod.rs | 145 ++++++++++--------
.../src/storage/{file_metadata.rs => utils.rs} | 37 ++---
crates/core/src/table/fs_view.rs | 95 +++++++++---
crates/core/src/table/mod.rs | 162 +++++++++++++++------
crates/core/src/timeline/mod.rs | 38 ++---
crates/datafusion/src/lib.rs | 10 +-
python/Cargo.toml | 16 +-
python/hudi/__init__.py | 1 +
python/hudi/_internal.pyi | 26 +++-
python/hudi/{_internal.pyi => _utils.py} | 17 +--
python/hudi/table.py | 31 +++-
python/pyproject.toml | 15 ++
python/src/lib.rs | 75 ++++++++--
python/{hudi => tests}/__init__.py | 4 -
python/{hudi/table.py => tests/conftest.py} | 26 ++--
python/tests/test_table_read.py | 53 +++++++
22 files changed, 595 insertions(+), 256 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 28340c9..6d21195 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -30,7 +30,7 @@ rust-version = "1.75.0"
[workspace.dependencies]
# arrow
-arrow = { version = "50" }
+arrow = { version = "50", features = ["pyarrow"] }
arrow-arith = { version = "50" }
arrow-array = { version = "50", features = ["chrono-tz"] }
arrow-buffer = { version = "50" }
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
index bdf05bb..98c12ab 100644
--- a/crates/core/Cargo.toml
+++ b/crates/core/Cargo.toml
@@ -67,6 +67,7 @@ url = { workspace = true }
async-recursion = { workspace = true }
async-trait = { workspace = true }
tokio = { workspace = true }
+futures = { workspace = true }
# test
tempfile = "3.10.1"
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index b0d791f..ec2e171 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -21,33 +21,50 @@ use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
-use crate::storage::file_metadata::FileMetadata;
+use crate::storage::file_info::FileInfo;
+use crate::storage::file_stats::FileStats;
use anyhow::{anyhow, Result};
#[derive(Clone, Debug)]
pub struct BaseFile {
pub file_group_id: String,
pub commit_time: String,
- pub metadata: Option<FileMetadata>,
+ pub info: FileInfo,
+ pub stats: Option<FileStats>,
}
impl BaseFile {
- pub fn new(file_name: &str) -> Self {
- let (name, _) = file_name.rsplit_once('.').unwrap();
+ fn parse_file_name(file_name: &str) -> Result<(String, String)> {
+ let err_msg = format!("Failed to parse file name '{}' for base file.",
file_name);
+ let (name, _) =
file_name.rsplit_once('.').ok_or(anyhow!(err_msg.clone()))?;
let parts: Vec<&str> = name.split('_').collect();
- let file_group_id = parts[0].to_owned();
- let commit_time = parts[2].to_owned();
- Self {
+ let file_group_id =
parts.first().ok_or(anyhow!(err_msg.clone()))?.to_string();
+ let commit_time =
parts.get(2).ok_or(anyhow!(err_msg.clone()))?.to_string();
+ Ok((file_group_id, commit_time))
+ }
+
+ pub fn from_file_name(file_name: &str) -> Result<Self> {
+ let (file_group_id, commit_time) = Self::parse_file_name(file_name)?;
+ Ok(Self {
file_group_id,
commit_time,
- metadata: None,
- }
+ info: FileInfo::default(),
+ stats: None,
+ })
}
- pub fn from_file_metadata(file_metadata: FileMetadata) -> Self {
- let mut base_file = Self::new(file_metadata.name.as_str());
- base_file.metadata = Some(file_metadata);
- base_file
+ pub fn from_file_info(info: FileInfo) -> Result<Self> {
+ let (file_group_id, commit_time) = Self::parse_file_name(&info.name)?;
+ Ok(Self {
+ file_group_id,
+ commit_time,
+ info,
+ stats: None,
+ })
+ }
+
+ pub fn populate_stats(&mut self, stats: FileStats) {
+ self.stats = Some(stats)
}
}
@@ -58,11 +75,8 @@ pub struct FileSlice {
}
impl FileSlice {
- pub fn base_file_path(&self) -> Option<&str> {
- match &self.base_file.metadata {
- None => None,
- Some(file_metadata) => Some(file_metadata.path.as_str()),
- }
+ pub fn base_file_path(&self) -> &str {
+ self.base_file.info.uri.as_str()
}
pub fn file_group_id(&self) -> &str {
@@ -102,9 +116,9 @@ impl FileGroup {
}
}
- #[allow(dead_code)]
- pub fn add_base_file_from_name(&mut self, file_name: &str) ->
Result<&Self> {
- let base_file = BaseFile::new(file_name);
+ #[cfg(test)]
+ fn add_base_file_from_name(&mut self, file_name: &str) -> Result<&Self> {
+ let base_file = BaseFile::from_file_name(file_name)?;
self.add_base_file(base_file)
}
@@ -131,6 +145,10 @@ impl FileGroup {
pub fn get_latest_file_slice(&self) -> Option<&FileSlice> {
return self.file_slices.values().next_back();
}
+
+ pub fn get_latest_file_slice_mut(&mut self) -> Option<&mut FileSlice> {
+ return self.file_slices.values_mut().next_back();
+ }
}
#[cfg(test)]
@@ -139,9 +157,10 @@ mod tests {
#[test]
fn create_a_base_file_successfully() {
- let base_file = BaseFile::new(
+ let base_file = BaseFile::from_file_name(
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
- );
+ )
+ .unwrap();
assert_eq!(
base_file.file_group_id,
"5a226868-2934-4f84-a16f-55124630c68d-0"
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 0d56755..d2c53ee 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -19,7 +19,7 @@
use crate::table::Table;
-mod file_group;
+pub mod file_group;
pub mod table;
pub type HudiTable = Table;
mod storage;
diff --git a/crates/core/src/lib.rs b/crates/core/src/storage/file_info.rs
similarity index 79%
copy from crates/core/src/lib.rs
copy to crates/core/src/storage/file_info.rs
index 0d56755..4bd178d 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/storage/file_info.rs
@@ -17,15 +17,9 @@
* under the License.
*/
-use crate::table::Table;
-
-mod file_group;
-pub mod table;
-pub type HudiTable = Table;
-mod storage;
-pub mod test_utils;
-mod timeline;
-
-pub fn crate_version() -> &'static str {
- env!("CARGO_PKG_VERSION")
+#[derive(Clone, Debug, Default)]
+pub struct FileInfo {
+ pub uri: String,
+ pub name: String,
+ pub size: usize,
}
diff --git a/crates/core/src/lib.rs b/crates/core/src/storage/file_stats.rs
similarity index 79%
copy from crates/core/src/lib.rs
copy to crates/core/src/storage/file_stats.rs
index 0d56755..ec63c14 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/storage/file_stats.rs
@@ -17,15 +17,7 @@
* under the License.
*/
-use crate::table::Table;
-
-mod file_group;
-pub mod table;
-pub type HudiTable = Table;
-mod storage;
-pub mod test_utils;
-mod timeline;
-
-pub fn crate_version() -> &'static str {
- env!("CARGO_PKG_VERSION")
+#[derive(Clone, Debug, Default)]
+pub struct FileStats {
+ pub num_records: i64,
}
diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs
index 3de77a4..c8b7b34 100644
--- a/crates/core/src/storage/mod.rs
+++ b/crates/core/src/storage/mod.rs
@@ -21,8 +21,10 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
+use arrow::record_batch::RecordBatch;
use async_recursion::async_recursion;
use bytes::Bytes;
+use futures::StreamExt;
use object_store::path::Path as ObjPath;
use object_store::{parse_url_opts, ObjectStore};
use parquet::arrow::async_reader::ParquetObjectReader;
@@ -30,9 +32,12 @@ use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::file::metadata::ParquetMetaData;
use url::Url;
-use crate::storage::file_metadata::FileMetadata;
+use crate::storage::file_info::FileInfo;
+use crate::storage::utils::join_url_segments;
-pub(crate) mod file_metadata;
+pub(crate) mod file_info;
+pub(crate) mod file_stats;
+pub(crate) mod utils;
#[allow(dead_code)]
pub struct Storage {
@@ -41,10 +46,8 @@ pub struct Storage {
options: HashMap<String, String>,
}
-#[allow(dead_code)]
impl Storage {
- pub fn new(base_uri: &str, options: HashMap<String, String>) ->
Box<Storage> {
- let base_url =
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
+ pub fn new(base_url: Url, options: HashMap<String, String>) ->
Box<Storage> {
let object_store = parse_url_opts(&base_url, &options).unwrap().0;
Box::from(Storage {
base_url,
@@ -53,76 +56,88 @@ impl Storage {
})
}
- pub async fn get_file_metadata(&self, relative_path: &str) -> FileMetadata
{
- let mut obj_url = self.base_url.clone();
- obj_url.path_segments_mut().unwrap().push(relative_path);
+ #[allow(dead_code)]
+ pub async fn get_file_info(&self, relative_path: &str) -> FileInfo {
+ let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
let meta = self.object_store.head(&obj_path).await.unwrap();
- FileMetadata {
- path: meta.location.to_string(),
+ FileInfo {
+ uri: obj_url.to_string(),
name: obj_path.filename().unwrap().to_string(),
size: meta.size,
- num_records: None,
}
}
pub async fn get_parquet_file_metadata(&self, relative_path: &str) ->
ParquetMetaData {
- let mut obj_url = self.base_url.clone();
- obj_url.path_segments_mut().unwrap().push(relative_path);
+ let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
- let meta = self.object_store.head(&obj_path).await.unwrap();
- let reader = ParquetObjectReader::new(self.object_store.clone(), meta);
+ let obj_store = self.object_store.clone();
+ let meta = obj_store.head(&obj_path).await.unwrap();
+ let reader = ParquetObjectReader::new(obj_store, meta);
let builder =
ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
builder.metadata().as_ref().to_owned()
}
pub async fn get_file_data(&self, relative_path: &str) -> Bytes {
- let mut obj_url = self.base_url.clone();
- obj_url.path_segments_mut().unwrap().push(relative_path);
+ let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
let result = self.object_store.get(&obj_path).await.unwrap();
result.bytes().await.unwrap()
}
+ pub async fn get_parquet_file_data(&self, relative_path: &str) ->
Vec<RecordBatch> {
+ let obj_url = join_url_segments(&self.base_url,
&[relative_path]).unwrap();
+ let obj_path = ObjPath::from_url_path(obj_url.path()).unwrap();
+ let obj_store = self.object_store.clone();
+ let meta = obj_store.head(&obj_path).await.unwrap();
+ let reader = ParquetObjectReader::new(obj_store, meta);
+ let stream = ParquetRecordBatchStreamBuilder::new(reader)
+ .await
+ .unwrap()
+ .build()
+ .unwrap();
+ stream
+ .collect::<Vec<_>>()
+ .await
+ .into_iter()
+ .map(|r| r.unwrap())
+ .collect()
+ }
+
pub async fn list_dirs(&self, subdir: Option<&str>) -> Vec<String> {
- self.list_dirs_as_paths(subdir)
+ self.list_dirs_as_obj_paths(subdir)
.await
.into_iter()
.map(|p| p.filename().unwrap().to_string())
.collect()
}
- pub async fn list_dirs_as_paths(&self, subdir: Option<&str>) ->
Vec<ObjPath> {
- let mut prefix_url = self.base_url.clone();
- if let Some(subdir) = subdir {
- prefix_url.path_segments_mut().unwrap().push(subdir);
- }
- let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+ async fn list_dirs_as_obj_paths(&self, subdir: Option<&str>) ->
Vec<ObjPath> {
+ let prefix_url = join_url_segments(&self.base_url,
&[subdir.unwrap_or_default()]).unwrap();
+ let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
self.object_store
- .list_with_delimiter(Some(&prefix))
+ .list_with_delimiter(Some(&prefix_path))
.await
.unwrap()
.common_prefixes
}
- pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileMetadata> {
- let mut prefix_url = self.base_url.clone();
- if let Some(subdir) = subdir {
- prefix_url.path_segments_mut().unwrap().push(subdir);
- }
- let prefix = ObjPath::from_url_path(prefix_url.path()).unwrap();
+ pub async fn list_files(&self, subdir: Option<&str>) -> Vec<FileInfo> {
+ let prefix_url = join_url_segments(&self.base_url,
&[subdir.unwrap_or_default()]).unwrap();
+ let prefix_path = ObjPath::from_url_path(prefix_url.path()).unwrap();
self.object_store
- .list_with_delimiter(Some(&prefix))
+ .list_with_delimiter(Some(&prefix_path))
.await
.unwrap()
.objects
.into_iter()
- .map(|obj_meta| {
- FileMetadata::new(
- obj_meta.location.to_string(),
- obj_meta.location.filename().unwrap().to_string(),
- obj_meta.size,
- )
+ .map(|obj_meta| FileInfo {
+ uri: prefix_url
+ .join(obj_meta.location.filename().unwrap())
+ .unwrap()
+ .to_string(),
+ name: obj_meta.location.filename().unwrap().to_string(),
+ size: obj_meta.size,
})
.collect()
}
@@ -157,6 +172,7 @@ mod tests {
use object_store::path::Path as ObjPath;
use url::Url;
+ use crate::storage::utils::join_url_segments;
use crate::storage::{get_leaf_dirs, Storage};
#[tokio::test]
@@ -165,7 +181,7 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url.path(), HashMap::new());
+ let storage = Storage::new(base_url, HashMap::new());
let first_level_dirs: HashSet<String> =
storage.list_dirs(None).await.into_iter().collect();
assert_eq!(
first_level_dirs,
@@ -186,12 +202,18 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url.path(), HashMap::new());
- let first_level_dirs: HashSet<ObjPath> =
- storage.list_dirs_as_paths(None).await.into_iter().collect();
+ let storage = Storage::new(base_url, HashMap::new());
+ let first_level_dirs: HashSet<ObjPath> = storage
+ .list_dirs_as_obj_paths(None)
+ .await
+ .into_iter()
+ .collect();
let expected_paths: HashSet<ObjPath> = vec![".hoodie", "part1",
"part2", "part3"]
.into_iter()
- .map(|dir|
ObjPath::from_url_path(base_url.join(dir).unwrap().path()).unwrap())
+ .map(|dir| {
+ ObjPath::from_url_path(join_url_segments(&storage.base_url,
&[dir]).unwrap().path())
+ .unwrap()
+ })
.collect();
assert_eq!(first_level_dirs, expected_paths);
}
@@ -202,26 +224,26 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url.path(), HashMap::new());
+ let storage = Storage::new(base_url, HashMap::new());
let file_names_1: Vec<String> = storage
.list_files(None)
.await
.into_iter()
- .map(|file_metadata| file_metadata.name)
+ .map(|file_info| file_info.name)
.collect();
assert_eq!(file_names_1, vec!["a.parquet"]);
let file_names_2: Vec<String> = storage
.list_files(Some("part1"))
.await
.into_iter()
- .map(|file_metadata| file_metadata.name)
+ .map(|file_info| file_info.name)
.collect();
assert_eq!(file_names_2, vec!["b.parquet"]);
let file_names_3: Vec<String> = storage
.list_files(Some("part2/part22"))
.await
.into_iter()
- .map(|file_metadata| file_metadata.name)
+ .map(|file_info| file_info.name)
.collect();
assert_eq!(file_names_3, vec!["c.parquet"]);
}
@@ -232,7 +254,7 @@ mod tests {
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap(),
)
.unwrap();
- let storage = Storage::new(base_url.path(), HashMap::new());
+ let storage = Storage::new(base_url, HashMap::new());
let leaf_dirs = get_leaf_dirs(&storage, None).await;
assert_eq!(
leaf_dirs,
@@ -241,19 +263,26 @@ mod tests {
}
#[tokio::test]
- async fn get_file_metadata() {
+ async fn storage_get_file_info() {
let base_url =
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
- let storage = Storage::new(base_url.path(), HashMap::new());
- let file_metadata = storage.get_file_metadata("a.parquet").await;
- assert_eq!(file_metadata.name, "a.parquet");
+ let storage = Storage::new(base_url, HashMap::new());
+ let file_info = storage.get_file_info("a.parquet").await;
+ assert_eq!(file_info.name, "a.parquet");
assert_eq!(
- file_metadata.path,
- ObjPath::from_url_path(base_url.join("a.parquet").unwrap().path())
- .unwrap()
- .to_string()
+ file_info.uri,
+ storage.base_url.join("a.parquet").unwrap().to_string()
);
- assert_eq!(file_metadata.size, 866);
- assert_eq!(file_metadata.num_records, None);
+ assert_eq!(file_info.size, 866);
+ }
+
+ #[tokio::test]
+ async fn storage_get_parquet_file_data() {
+ let base_url =
+
Url::from_directory_path(canonicalize(Path::new("fixtures")).unwrap()).unwrap();
+ let storage = Storage::new(base_url, HashMap::new());
+ let file_data = storage.get_parquet_file_data("a.parquet").await;
+ assert_eq!(file_data.len(), 1);
+ assert_eq!(file_data.first().unwrap().num_rows(), 5);
}
}
diff --git a/crates/core/src/storage/file_metadata.rs
b/crates/core/src/storage/utils.rs
similarity index 74%
rename from crates/core/src/storage/file_metadata.rs
rename to crates/core/src/storage/utils.rs
index a7b8f28..cf81dc0 100644
--- a/crates/core/src/storage/file_metadata.rs
+++ b/crates/core/src/storage/utils.rs
@@ -17,28 +17,9 @@
* under the License.
*/
-use anyhow::anyhow;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use std::path::Path;
-
-#[derive(Clone, Debug, Default, Eq, PartialEq)]
-pub struct FileMetadata {
- pub path: String,
- pub name: String,
- pub size: usize,
- pub num_records: Option<usize>,
-}
-
-impl FileMetadata {
- pub fn new(path: String, name: String, size: usize) -> FileMetadata {
- FileMetadata {
- path,
- name,
- size,
- num_records: None,
- }
- }
-}
+use url::{ParseError, Url};
pub fn split_filename(filename: &str) -> Result<(String, String)> {
let path = Path::new(filename);
@@ -57,3 +38,17 @@ pub fn split_filename(filename: &str) -> Result<(String,
String)> {
Ok((stem, extension))
}
+
+pub fn join_url_segments(base_url: &Url, segments: &[&str]) -> Result<Url> {
+ let mut url = base_url.clone();
+
+ if url.path().ends_with('/') {
+ url.path_segments_mut().unwrap().pop();
+ }
+
+ url.path_segments_mut()
+ .map_err(|_| ParseError::RelativeUrlWithoutBase)?
+ .extend(segments);
+
+ Ok(url)
+}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index 2f3b981..5f9cf0f 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -18,30 +18,33 @@
*/
use std::collections::HashMap;
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use anyhow::{anyhow, Result};
+use arrow::record_batch::RecordBatch;
+use url::Url;
use crate::file_group::{BaseFile, FileGroup, FileSlice};
-use crate::storage::file_metadata::FileMetadata;
+use crate::storage::file_info::FileInfo;
+use crate::storage::file_stats::FileStats;
use crate::storage::{get_leaf_dirs, Storage};
#[derive(Clone, Debug)]
pub struct FileSystemView {
- pub base_path: PathBuf,
+ pub base_url: Url,
partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
}
impl FileSystemView {
- pub fn new(base_path: &Path) -> Self {
+ pub fn new(base_url: Url) -> Self {
FileSystemView {
- base_path: base_path.to_path_buf(),
+ base_url,
partition_to_file_groups: HashMap::new(),
}
}
async fn get_partition_paths(&self) -> Result<Vec<String>> {
- let storage = Storage::new(self.base_path.to_str().unwrap(),
HashMap::new());
+ let storage = Storage::new(self.base_url.clone(), HashMap::new());
let top_level_dirs: Vec<String> = storage
.list_dirs(None)
.await
@@ -56,16 +59,16 @@ impl FileSystemView {
}
async fn get_file_groups(&self, partition_path: &str) ->
Result<Vec<FileGroup>> {
- let storage = Storage::new(self.base_path.to_str().unwrap(),
HashMap::new());
- let file_metadata: Vec<FileMetadata> = storage
+ let storage = Storage::new(self.base_url.clone(), HashMap::new());
+ let file_info: Vec<FileInfo> = storage
.list_files(Some(partition_path))
.await
.into_iter()
.filter(|f| f.name.ends_with(".parquet"))
.collect();
let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
- for f in file_metadata {
- let base_file = BaseFile::from_file_metadata(f);
+ for f in file_info {
+ let base_file = BaseFile::from_file_info(f)?;
let fg_id = &base_file.file_group_id;
fg_id_to_base_files
.entry(fg_id.to_owned())
@@ -84,8 +87,7 @@ impl FileSystemView {
Ok(file_groups)
}
- pub fn get_latest_file_slices(&mut self) -> Vec<&FileSlice> {
- let mut file_slices = Vec::new();
+ pub fn load_file_groups(&mut self) {
let fs_view = self.clone();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
@@ -96,6 +98,10 @@ impl FileSystemView {
for (k, v) in result {
self.partition_to_file_groups.insert(k, v);
}
+ }
+
+ pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+ let mut file_slices = Vec::new();
for fgs in self.partition_to_file_groups.values() {
for fg in fgs {
if let Some(file_slice) = fg.get_latest_file_slice() {
@@ -105,6 +111,52 @@ impl FileSystemView {
}
file_slices
}
+
+ pub fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut FileSlice>
{
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ let mut file_slices = Vec::new();
+ let file_groups = &mut self.partition_to_file_groups.values_mut();
+ for fgs in file_groups {
+ for fg in fgs {
+ if let Some(file_slice) = fg.get_latest_file_slice_mut() {
+ let wrapper = async {
load_file_slice_stats(&self.base_url, file_slice).await };
+ let _ = rt.block_on(wrapper);
+ file_slices.push(file_slice)
+ }
+ }
+ }
+ file_slices
+ }
+
+ pub fn read_file_slice(&self, relative_path: &str) -> Vec<RecordBatch> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ let storage = Storage::new(self.base_url.clone(), HashMap::new());
+ let wrapper = async {
storage.get_parquet_file_data(relative_path).await };
+ rt.block_on(wrapper)
+ }
+}
+
+async fn load_file_slice_stats(base_url: &Url, file_slice: &mut FileSlice) ->
Result<()> {
+ let base_file = &mut file_slice.base_file;
+ if base_file.stats.is_none() {
+ let storage = Storage::new(base_url.clone(), HashMap::new());
+ let ptn = file_slice.partition_path.clone();
+ let mut relative_path = PathBuf::from(ptn.unwrap_or("".to_string()));
+ let base_file_name = &base_file.info.name;
+ relative_path.push(base_file_name);
+ let parquet_meta = storage
+ .get_parquet_file_metadata(relative_path.to_str().unwrap())
+ .await;
+ let num_records = parquet_meta.file_metadata().num_rows();
+ base_file.populate_stats(FileStats { num_records });
+ }
+ Ok(())
}
async fn get_partitions_and_file_groups(
@@ -133,17 +185,20 @@ async fn get_partitions_and_file_groups(
#[cfg(test)]
mod tests {
use std::collections::HashSet;
+ use std::fs::canonicalize;
use std::path::Path;
- use crate::test_utils::extract_test_table;
+ use url::Url;
use crate::table::fs_view::FileSystemView;
+ use crate::test_utils::extract_test_table;
#[tokio::test]
async fn get_partition_paths() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let fs_view = FileSystemView::new(&target_table_path);
+ let fixture_path =
+
canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap();
+ let base_url =
Url::from_file_path(extract_test_table(&fixture_path)).unwrap();
+ let fs_view = FileSystemView::new(base_url);
let partition_paths = fs_view.get_partition_paths().await.unwrap();
let partition_path_set: HashSet<&str> =
HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
@@ -155,9 +210,11 @@ mod tests {
#[test]
fn get_latest_file_slices() {
- let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let mut fs_view = FileSystemView::new(&target_table_path);
+ let fixture_path =
+
canonicalize(Path::new("fixtures/table/0.x_cow_partitioned.zip")).unwrap();
+ let base_url =
Url::from_file_path(extract_test_table(&fixture_path)).unwrap();
+ let mut fs_view = FileSystemView::new(base_url);
+ fs_view.load_file_groups();
let file_slices = fs_view.get_latest_file_slices();
assert_eq!(file_slices.len(), 5);
let mut fg_ids = Vec::new();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 9191aa4..681ef05 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -17,16 +17,18 @@
* under the License.
*/
-use anyhow::Result;
use std::collections::HashMap;
-use std::fs::File;
use std::io::{BufRead, BufReader};
-use std::path::{Path, PathBuf};
+use std::path::PathBuf;
use std::str::FromStr;
+use anyhow::Result;
+use arrow::record_batch::RecordBatch;
use arrow_schema::SchemaRef;
+use url::Url;
use crate::file_group::FileSlice;
+use crate::storage::Storage;
use crate::table::config::BaseFileFormat;
use crate::table::config::{ConfigKey, TableType};
use crate::table::fs_view::FileSystemView;
@@ -38,26 +40,44 @@ mod fs_view;
mod metadata;
#[derive(Debug, Clone)]
+#[allow(dead_code)]
pub struct Table {
- pub base_path: PathBuf,
+ pub base_url: Url,
pub props: HashMap<String, String>,
+ pub file_system_view: Option<FileSystemView>,
+ pub storage_options: HashMap<String, String>,
}
impl Table {
- pub fn new(table_base_path: &str) -> Self {
- let base_path = PathBuf::from(table_base_path);
- let props_path = base_path.join(".hoodie").join("hoodie.properties");
- match Self::load_properties(props_path.as_path()) {
- Ok(props) => Self { base_path, props },
+ pub fn new(base_uri: &str, storage_options: HashMap<String, String>) ->
Self {
+ let base_url =
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
+ match Self::load_properties(&base_url, ".hoodie/hoodie.properties",
&storage_options) {
+ Ok(props) => Self {
+ base_url,
+ props,
+ file_system_view: None,
+ storage_options,
+ },
Err(e) => {
panic!("Failed to load table properties: {}", e)
}
}
}
- fn load_properties(path: &Path) -> Result<HashMap<String, String>> {
- let file = File::open(path)?;
- let reader = BufReader::new(file);
+ fn load_properties(
+ base_url: &Url,
+ props_path: &str,
+ storage_options: &HashMap<String, String>,
+ ) -> Result<HashMap<String, String>> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .unwrap();
+ let storage = Storage::new(base_url.clone(), storage_options.clone());
+ let get_data = async { storage.get_file_data(props_path).await };
+ let data = rt.block_on(get_data);
+ let cursor = std::io::Cursor::new(data);
+ let reader = BufReader::new(cursor);
let lines = reader.lines();
let mut properties: HashMap<String, String> = HashMap::new();
for line in lines {
@@ -81,58 +101,70 @@ impl Table {
}
}
- pub fn get_timeline(&self) -> Result<Timeline> {
+ #[cfg(test)]
+ fn get_timeline(&self) -> Result<Timeline> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
- let f = async { Timeline::new(self.base_path.to_str().unwrap()).await
};
- rt.block_on(f)
+ let init_timeline = async { Timeline::new(self.base_url.clone()).await
};
+ rt.block_on(init_timeline)
}
- pub fn schema(&self) -> SchemaRef {
+ pub fn get_latest_schema(&self) -> SchemaRef {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
- let f = async { Timeline::new(self.base_path.to_str().unwrap()).await
};
- let timeline = rt.block_on(f);
+ let init_timeline = async { Timeline::new(self.base_url.clone()).await
};
+ let timeline = rt.block_on(init_timeline);
match timeline {
Ok(timeline) => {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
- let wrapper = async { timeline.get_latest_schema().await };
- let result = rt.block_on(wrapper);
- match result {
+ let get_schema = async { timeline.get_latest_schema().await };
+ match rt.block_on(get_schema) {
Ok(schema) => SchemaRef::from(schema),
- Err(e) => {
- panic!("Failed to resolve table schema: {}", e)
- }
+ Err(e) => panic!("Failed to resolve table schema: {}", e),
}
}
- Err(e) => {
- panic!("Failed to resolve table schema: {}", e)
- }
+ Err(e) => panic!("Failed to resolve table schema: {}", e),
}
}
- pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>> {
+ pub fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
+ if self.file_system_view.is_none() {
+ let mut new_fs_view = FileSystemView::new(self.base_url.clone());
+ new_fs_view.load_file_groups();
+ self.file_system_view = Some(new_fs_view);
+ }
+
+ let fs_view = self.file_system_view.as_mut().unwrap();
+
let mut file_slices = Vec::new();
- let mut fs_view = FileSystemView::new(self.base_path.as_path());
- for f in fs_view.get_latest_file_slices() {
+ for f in fs_view.get_latest_file_slices_with_stats() {
file_slices.push(f.clone());
}
Ok(file_slices)
}
- pub fn get_latest_file_paths(&self) -> Result<Vec<String>> {
+ pub fn read_file_slice(&mut self, relative_path: &str) -> Vec<RecordBatch>
{
+ if self.file_system_view.is_none() {
+ let mut new_fs_view = FileSystemView::new(self.base_url.clone());
+ new_fs_view.load_file_groups();
+ self.file_system_view = Some(new_fs_view);
+ }
+
+ let fs_view = self.file_system_view.as_ref().unwrap();
+ fs_view.read_file_slice(relative_path)
+ }
+
+ pub fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
for f in self.get_latest_file_slices()? {
- if let Some(f) = f.base_file_path() {
- file_paths.push(f.to_string());
- }
+ file_paths.push(f.base_file_path().to_string());
}
Ok(file_paths)
}
@@ -178,7 +210,7 @@ impl ProvidesTableMetadata for Table {
}
fn location(&self) -> String {
- self.base_path.to_str().unwrap().to_string()
+ self.base_url.path().to_string()
}
fn partition_fields(&self) -> Vec<String> {
@@ -223,7 +255,10 @@ impl ProvidesTableMetadata for Table {
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
+ use std::fs::canonicalize;
use std::path::Path;
+ use url::Url;
use crate::table::config::BaseFileFormat::Parquet;
use crate::table::config::TableType::CopyOnWrite;
@@ -231,20 +266,62 @@ mod tests {
use crate::table::Table;
use crate::test_utils::extract_test_table;
+ #[test]
+ fn hudi_table_get_latest_schema() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let base_url =
Url::from_file_path(extract_test_table(fixture_path)).unwrap();
+ let hudi_table = Table::new(base_url.path(), HashMap::new());
+ let fields: Vec<String> = hudi_table
+ .get_latest_schema()
+ .all_fields()
+ .into_iter()
+ .map(|f| f.name().to_string())
+ .collect();
+ assert_eq!(
+ fields,
+ Vec::from([
+ "_hoodie_commit_time",
+ "_hoodie_commit_seqno",
+ "_hoodie_record_key",
+ "_hoodie_partition_path",
+ "_hoodie_file_name",
+ "ts",
+ "uuid",
+ "rider",
+ "driver",
+ "fare",
+ "city"
+ ])
+ );
+ }
+
+ #[test]
+ fn hudi_table_read_file_slice() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let base_url =
Url::from_file_path(extract_test_table(fixture_path)).unwrap();
+ let mut hudi_table = Table::new(base_url.path(), HashMap::new());
+ let batches = hudi_table.read_file_slice(
+
"san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet",
+ );
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches.first().unwrap().num_rows(), 1);
+ assert_eq!(batches.first().unwrap().num_columns(), 11);
+ }
+
#[test]
fn hudi_table_get_latest_file_paths() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
- let target_table_path = extract_test_table(fixture_path);
- let hudi_table = Table::new(target_table_path.to_str().unwrap());
+ let base_url =
Url::from_file_path(extract_test_table(fixture_path)).unwrap();
+ let mut hudi_table = Table::new(base_url.path(), HashMap::new());
assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2);
assert_eq!(hudi_table.get_latest_file_paths().unwrap().len(), 5);
- println!("{}", hudi_table.schema());
}
#[test]
fn hudi_table_get_table_metadata() {
- let fixture_path =
Path::new("fixtures/table_metadata/sample_table_properties");
- let table = Table::new(fixture_path.to_str().unwrap());
+ let base_path =
+
canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap();
+ let table = Table::new(base_path.to_str().unwrap(), HashMap::new());
assert_eq!(table.base_file_format(), Parquet);
assert_eq!(table.checksum(), 3761586722);
assert_eq!(table.database_name(), "default");
@@ -256,10 +333,7 @@ mod tests {
table.key_generator_class(),
"org.apache.hudi.keygen.SimpleKeyGenerator"
);
- assert_eq!(
- table.location(),
- "fixtures/table_metadata/sample_table_properties"
- );
+ assert_eq!(table.location(), base_path.to_str().unwrap());
assert_eq!(table.partition_fields(), vec!["city"]);
assert_eq!(table.precombine_field(), "ts");
assert!(table.populates_meta_fields());
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 3ac0a70..e7f8010 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -25,8 +25,9 @@ use anyhow::{anyhow, Result};
use arrow_schema::SchemaRef;
use parquet::arrow::parquet_to_arrow_schema;
use serde_json::{Map, Value};
+use url::Url;
-use crate::storage::file_metadata::split_filename;
+use crate::storage::utils::split_filename;
use crate::storage::Storage;
#[allow(dead_code)]
@@ -60,24 +61,21 @@ impl Instant {
#[derive(Debug, Clone)]
pub struct Timeline {
- pub base_path: String,
+ pub base_url: Url,
pub instants: Vec<Instant>,
}
impl Timeline {
- pub async fn new(base_path: &str) -> Result<Self> {
- let instants = Self::load_completed_commit_instants(base_path).await?;
- Ok(Self {
- base_path: base_path.to_string(),
- instants,
- })
+ pub async fn new(base_url: Url) -> Result<Self> {
+ let instants = Self::load_completed_commit_instants(&base_url).await?;
+ Ok(Self { base_url, instants })
}
- async fn load_completed_commit_instants(base_path: &str) ->
Result<Vec<Instant>> {
- let storage = Storage::new(base_path, HashMap::new());
+ async fn load_completed_commit_instants(base_url: &Url) ->
Result<Vec<Instant>> {
+ let storage = Storage::new(base_url.clone(), HashMap::new());
let mut completed_commits = Vec::new();
- for file_metadata in storage.list_files(Some(".hoodie")).await {
- let (file_stem, file_ext) =
split_filename(file_metadata.name.as_str())?;
+ for file_info in storage.list_files(Some(".hoodie")).await {
+ let (file_stem, file_ext) =
split_filename(file_info.name.as_str())?;
if file_ext == "commit" {
completed_commits.push(Instant {
state: State::Completed,
@@ -96,7 +94,7 @@ impl Timeline {
Some(instant) => {
let mut commit_file_path = PathBuf::from(".hoodie");
commit_file_path.push(instant.file_name());
- let storage = Storage::new(&self.base_path, HashMap::new());
+ let storage = Storage::new(self.base_url.clone(),
HashMap::new());
let bytes = storage
.get_file_data(commit_file_path.to_str().unwrap())
.await;
@@ -118,7 +116,7 @@ impl Timeline {
if let Some((_, value)) = partition_to_write_stats.iter().next() {
if let Some(first_value) = value.as_array().and_then(|arr|
arr.first()) {
if let Some(path) = first_value["path"].as_str() {
- let storage = Storage::new(&self.base_path,
HashMap::new());
+ let storage = Storage::new(self.base_url.clone(),
HashMap::new());
let parquet_meta =
storage.get_parquet_file_metadata(path).await;
let arrow_schema = parquet_to_arrow_schema(
parquet_meta.file_metadata().schema_descr(),
@@ -138,6 +136,8 @@ mod tests {
use std::fs::canonicalize;
use std::path::Path;
+ use url::Url;
+
use crate::test_utils::extract_test_table;
use crate::timeline::{Instant, State, Timeline};
@@ -145,16 +145,18 @@ mod tests {
async fn read_latest_schema() {
let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
let target_table_path = extract_test_table(fixture_path);
- let base_path = canonicalize(target_table_path).unwrap();
- let timeline =
Timeline::new(base_path.to_str().unwrap()).await.unwrap();
+ let base_url =
Url::from_file_path(canonicalize(target_table_path).unwrap()).unwrap();
+ let timeline = Timeline::new(base_url).await.unwrap();
let table_schema = timeline.get_latest_schema().await.unwrap();
assert_eq!(table_schema.fields.len(), 11)
}
#[tokio::test]
async fn init_commits_timeline() {
- let base_path =
canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap();
- let timeline =
Timeline::new(base_path.to_str().unwrap()).await.unwrap();
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("fixtures/timeline/commits_stub")).unwrap())
+ .unwrap();
+ let timeline = Timeline::new(base_url).await.unwrap();
assert_eq!(
timeline.instants,
vec![
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 56aae9d..7025a8c 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -19,6 +19,7 @@
use arrow_array::RecordBatch;
use std::any::Any;
+use std::collections::HashMap;
use std::fmt::Debug;
use std::fs::File;
use std::sync::Arc;
@@ -45,7 +46,7 @@ pub struct HudiDataSource {
impl HudiDataSource {
pub fn new(base_path: &str) -> Self {
Self {
- table: HudiTable::new(base_path),
+ table: HudiTable::new(base_path, HashMap::new()),
}
}
pub(crate) async fn create_physical_plan(
@@ -56,7 +57,7 @@ impl HudiDataSource {
Ok(Arc::new(HudiExec::new(projections, schema, self.clone())))
}
- fn get_record_batches(&self) ->
datafusion_common::Result<Vec<RecordBatch>> {
+ fn get_record_batches(&mut self) ->
datafusion_common::Result<Vec<RecordBatch>> {
match self.table.get_latest_file_paths() {
Ok(file_paths) => {
let mut record_batches = Vec::new();
@@ -84,7 +85,7 @@ impl TableProvider for HudiDataSource {
}
fn schema(&self) -> SchemaRef {
- self.table.schema()
+ self.table.get_latest_schema()
}
fn table_type(&self) -> TableType {
@@ -161,7 +162,8 @@ impl ExecutionPlan for HudiExec {
_partition: usize,
_context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
- let data = self.data_source.get_record_batches()?;
+ let mut data_source = self.data_source.clone();
+ let data = data_source.get_record_batches()?;
Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
}
}
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 6c5702b..613b010 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -27,10 +27,22 @@ doc = false
[dependencies]
object_store = { workspace = true }
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true }
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true }
+arrow-select = { workspace = true }
[dependencies.pyo3]
-version = "0.21.2"
-features = ["extension-module", "abi3", "abi3-py38", "gil-refs"]
+version = "0.20.3"
+features = ["extension-module", "abi3", "abi3-py38"]
[dependencies.hudi]
path = "../crates/hudi"
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index fa70485..8054368 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -17,4 +17,5 @@
from ._internal import __version__ as __version__
from ._internal import rust_core_version as rust_core_version
+from ._internal import HudiFileSlice as HudiFileSlice
from .table import HudiTable as HudiTable
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 8759c2e..b34c25a 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -15,7 +15,9 @@
# specific language governing permissions and limitations
# under the License.
-from typing import List
+from typing import List, Dict, Optional
+
+import pyarrow
__version__: str
@@ -23,8 +25,26 @@ __version__: str
def rust_core_version() -> str: ...
+class HudiFileSlice:
+ file_group_id: str
+ partition_path: str
+ commit_time: str
+ base_file_name: str
+ base_file_path: str
+ base_file_size: int
+ num_records: int
+
+
class BindingHudiTable:
- def __init__(self, table_uri: str): ...
+ def __init__(
+ self,
+ table_uri: str,
+ storage_options: Optional[Dict[str, str]] = None,
+ ): ...
+
+ def schema(self) -> "pyarrow.Schema": ...
+
+ def get_latest_file_slices(self) -> List[HudiFileSlice]: ...
- def get_latest_file_paths(self) -> List[str]: ...
+ def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]:
...
diff --git a/python/hudi/_internal.pyi b/python/hudi/_utils.py
similarity index 78%
copy from python/hudi/_internal.pyi
copy to python/hudi/_utils.py
index 8759c2e..779a14b 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_utils.py
@@ -14,17 +14,10 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+from typing import List, Any, Iterator
-from typing import List
-__version__: str
-
-
-def rust_core_version() -> str: ...
-
-
-class BindingHudiTable:
-
- def __init__(self, table_uri: str): ...
-
- def get_latest_file_paths(self) -> List[str]: ...
+def split_list(lst: List[Any], n: int) -> Iterator[List[Any]]:
+ split_size = (len(lst) + n - 1) // n
+ for i in range(0, len(lst), split_size):
+ yield lst[i: i + split_size]
diff --git a/python/hudi/table.py b/python/hudi/table.py
index a8cfbcb..c9cab1b 100644
--- a/python/hudi/table.py
+++ b/python/hudi/table.py
@@ -15,18 +15,37 @@
# specific language governing permissions and limitations
# under the License.
+import os
from dataclasses import dataclass
from pathlib import Path
-from typing import Union, List
+from typing import Union, List, Iterator, Optional, Dict
-from python.hudi._internal import BindingHudiTable
+import pyarrow
+
+from hudi._internal import BindingHudiTable, HudiFileSlice
+from hudi._utils import split_list
@dataclass(init=False)
class HudiTable:
- def __init__(self, table_uri: Union[str, Path, ""]):
- self._table = BindingHudiTable(str(table_uri))
+ def __init__(
+ self,
+ table_uri: Union[str, Path, "os.PathLike[str]"],
+ storage_options: Optional[Dict[str, str]] = None,
+ ):
+ self._table = BindingHudiTable(str(table_uri), storage_options)
+
+ def schema(self) -> "pyarrow.Schema":
+ return self._table.schema()
+
+ def split_latest_file_slices(self, n) -> Iterator[List[HudiFileSlice]]:
+ file_slices = self.get_latest_file_slices()
+ for split in split_list(file_slices, n):
+ yield split
+
+ def get_latest_file_slices(self) -> List[HudiFileSlice]:
+ return self._table.get_latest_file_slices()
- def get_latest_file_paths(self) -> List[str]:
- return self._table.get_latest_file_paths()
+ def read_file_slice(self, relative_path) -> List["pyarrow.RecordBatch"]:
+ return self._table.read_file_slice(relative_path)
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 1abc6c1..175773c 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -22,6 +22,7 @@ build-backend = "maturin"
[project]
name = "hudi"
description = "Native Hudi Python binding based on hudi-rs"
+urls = { repository = "https://github.com/apache/hudi-rs/tree/main/python/" }
readme = "README.md"
requires-python = ">=3.8"
license = "Apache License 2.0"
@@ -37,8 +38,22 @@ dependencies = [
"pyarrow>=8"
]
+optional-dependencies = { devel = [
+ "pytest"
+] }
+
dynamic = ["version"]
[tool.maturin]
module-name = "hudi._internal"
features = ["pyo3/extension-module"]
+
+[tool.mypy]
+files = "hudi/*.py"
+exclude = "^tests"
+
+[tool.pytest.ini_options]
+testpaths = [
+ "tests",
+ "hudi",
+]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 8ac33f4..07c81b0 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -17,35 +17,91 @@
* under the License.
*/
+use std::collections::HashMap;
+use std::path::PathBuf;
+
+use arrow::pyarrow::ToPyArrow;
use pyo3::prelude::*;
-use hudi::table::Table;
+use hudi::file_group::FileSlice;
+use hudi::HudiTable;
+
+#[pyclass]
+struct HudiFileSlice {
+ #[pyo3(get)]
+ file_group_id: String,
+ #[pyo3(get)]
+ partition_path: String,
+ #[pyo3(get)]
+ commit_time: String,
+ #[pyo3(get)]
+ base_file_name: String,
+ #[pyo3(get)]
+ base_file_path: String,
+ #[pyo3(get)]
+ base_file_size: usize,
+ #[pyo3(get)]
+ num_records: i64,
+}
+
+impl HudiFileSlice {
+ pub fn from_file_slice(f: FileSlice) -> Self {
+ let partition_path =
f.partition_path.clone().unwrap_or("".to_string());
+ let mut p = PathBuf::from(&partition_path);
+ p.push(f.base_file.info.name.clone());
+ let base_file_path = p.to_str().unwrap().to_string();
+ Self {
+ file_group_id: f.file_group_id().to_string(),
+ partition_path,
+ commit_time: f.base_file.commit_time,
+ base_file_name: f.base_file.info.name,
+ base_file_path,
+ base_file_size: f.base_file.info.size,
+ num_records: f.base_file.stats.unwrap().num_records,
+ }
+ }
+}
#[pyclass]
struct BindingHudiTable {
- _table: hudi::HudiTable,
+ _table: HudiTable,
}
#[pymethods]
impl BindingHudiTable {
#[new]
- #[pyo3(signature = (table_uri))]
- fn new(py: Python, table_uri: &str) -> PyResult<Self> {
+ #[pyo3(signature = (table_uri, storage_options = None))]
+ fn new(
+ py: Python,
+ table_uri: &str,
+ storage_options: Option<HashMap<String, String>>,
+ ) -> PyResult<Self> {
py.allow_threads(|| {
Ok(BindingHudiTable {
- _table: Table::new(table_uri),
+ _table: HudiTable::new(table_uri,
storage_options.unwrap_or_default()),
})
})
}
- pub fn get_latest_file_paths(&self) -> PyResult<Vec<String>> {
- match self._table.get_latest_file_paths() {
- Ok(paths) => Ok(paths),
+ pub fn schema(&self, py: Python) -> PyResult<PyObject> {
+ self._table.get_latest_schema().to_pyarrow(py)
+ }
+
+ pub fn get_latest_file_slices(&mut self) -> PyResult<Vec<HudiFileSlice>> {
+ match self._table.get_latest_file_slices() {
+ Ok(file_slices) => Ok(file_slices
+ .into_iter()
+ .map(HudiFileSlice::from_file_slice)
+ .collect()),
Err(_e) => {
- panic!("Failed to retrieve the latest file paths.")
+ panic!("Failed to retrieve the latest file slices.")
}
}
}
+
+ pub fn read_file_slice(&mut self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
+ self._table.read_file_slice(relative_path).to_pyarrow(py)
+ }
}
#[pyfunction]
@@ -58,6 +114,7 @@ fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
+ m.add_class::<HudiFileSlice>()?;
m.add_class::<BindingHudiTable>()?;
Ok(())
}
diff --git a/python/hudi/__init__.py b/python/tests/__init__.py
similarity index 83%
copy from python/hudi/__init__.py
copy to python/tests/__init__.py
index fa70485..a67d5ea 100644
--- a/python/hudi/__init__.py
+++ b/python/tests/__init__.py
@@ -14,7 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-from ._internal import __version__ as __version__
-from ._internal import rust_core_version as rust_core_version
-from .table import HudiTable as HudiTable
diff --git a/python/hudi/table.py b/python/tests/conftest.py
similarity index 58%
copy from python/hudi/table.py
copy to python/tests/conftest.py
index a8cfbcb..2dcfdeb 100644
--- a/python/hudi/table.py
+++ b/python/tests/conftest.py
@@ -15,18 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-from dataclasses import dataclass
+import os
+import zipfile
from pathlib import Path
-from typing import Union, List
-from python.hudi._internal import BindingHudiTable
+import pytest
-@dataclass(init=False)
-class HudiTable:
+def _extract_testing_table(zip_file_path, target_path) -> str:
+ with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
+ zip_ref.extractall(target_path)
+ return os.path.join(target_path, "trips_table")
- def __init__(self, table_uri: Union[str, Path, ""]):
- self._table = BindingHudiTable(str(table_uri))
- def get_latest_file_paths(self) -> List[str]:
- return self._table.get_latest_file_paths()
[email protected](
+ params=[
+ "0.x_cow_partitioned",
+ ]
+)
+def get_sample_table(request, tmp_path) -> str:
+ fixture_path = "../crates/core/fixtures/table"
+ table_name = request.param
+ zip_file_path = Path(fixture_path).joinpath(f"{table_name}.zip")
+ return _extract_testing_table(zip_file_path, tmp_path)
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
new file mode 100644
index 0000000..20ce42c
--- /dev/null
+++ b/python/tests/test_table_read.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pyarrow as pa
+import pytest
+
+from hudi import HudiTable
+
+PYARROW_LE_8_0_0 = tuple(int(s) for s in pa.__version__.split(".") if
s.isnumeric()) < (8, 0, 0)
+pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, reason="hudi only supported
if pyarrow >= 8.0.0")
+
+
+def test_sample_table(get_sample_table):
+ table_path = get_sample_table
+ table = HudiTable(table_path, {})
+
+ assert table.schema().names == ['_hoodie_commit_time',
'_hoodie_commit_seqno', '_hoodie_record_key',
+ '_hoodie_partition_path',
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',
+ 'fare', 'city']
+
+ file_slices = table.get_latest_file_slices()
+ assert len(file_slices) == 5
+ assert set(f.commit_time for f in file_slices) == {'20240402123035233',
'20240402144910683'}
+ assert all(f.num_records == 1 for f in file_slices)
+ file_slice_paths = [f.base_file_path for f in file_slices]
+ assert set(file_slice_paths) ==
{'chennai/68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0_4-12-0_20240402123035233.parquet',
+
'san_francisco/d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0_1-9-0_20240402123035233.parquet',
+
'san_francisco/780b8586-3ad0-48ef-a6a1-d2217845ce4a-0_0-8-0_20240402123035233.parquet',
+
'san_francisco/5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet',
+
'sao_paulo/ee915c68-d7f8-44f6-9759-e691add290d8-0_3-11-0_20240402123035233.parquet'}
+
+ batches = table.read_file_slice(file_slice_paths[0])
+ t = pa.Table.from_batches(batches)
+ assert t.num_rows == 1
+ assert t.num_columns == 11
+
+ file_slices_gen = table.split_latest_file_slices(2)
+ assert len(next(file_slices_gen)) == 3
+ assert len(next(file_slices_gen)) == 2