This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0236839 feat: initial rust implementation to integrate with
datafusion (#1)
0236839 is described below
commit 0236839b87f9679f546402a67535d689634d546b
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri May 3 18:24:28 2024 -0500
feat: initial rust implementation to integrate with datafusion (#1)
Implement basic core Hudi models and `HudiDataSource` for
https://github.com/apache/datafusion
---
.gitignore | 15 ++
Cargo.toml | 54 ++++++++
README.md | 23 +++
crates/core/Cargo.toml | 51 +++++++
crates/core/fixtures/table/0.x_cow_partitioned.zip | Bin 0 -> 46757 bytes
.../commits_stub/.hoodie/20240402123035233.commit | 0
.../.hoodie/20240402123035233.commit.requested | 0
.../.hoodie/20240402123035233.inflight | 0
.../commits_stub/.hoodie/20240402144910683.commit | 0
.../.hoodie/20240402144910683.commit.requested | 0
.../.hoodie/20240402144910683.inflight | 0
.../commits_stub/.hoodie/hoodie.properties | 0
crates/core/src/error.rs | 35 +++++
crates/core/src/file_group/mod.rs | 152 ++++++++++++++++++++
crates/core/src/lib.rs | 13 ++
crates/core/src/table/file_system_view.rs | 79 +++++++++++
crates/core/src/table/meta_client.rs | 133 ++++++++++++++++++
crates/core/src/table/mod.rs | 59 ++++++++
crates/core/src/timeline/mod.rs | 146 +++++++++++++++++++
crates/datafusion/Cargo.toml | 56 ++++++++
crates/datafusion/src/bin/main.rs | 16 +++
crates/datafusion/src/lib.rs | 154 +++++++++++++++++++++
crates/fs/Cargo.toml | 50 +++++++
crates/fs/fixtures/a.parquet | Bin 0 -> 866 bytes
crates/fs/src/file_systems.rs | 38 +++++
crates/fs/src/lib.rs | 30 ++++
crates/fs/src/test_utils.rs | 12 ++
crates/hudi/Cargo.toml | 11 ++
crates/hudi/src/lib.rs | 1 +
29 files changed, 1128 insertions(+)
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..040a12d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,15 @@
+/Cargo.lock
+/target
+**/target
+
+/.idea
+.vscode
+
+# python
+.env
+venv
+**/.python-version
+__pycache__
+
+# macOS
+**/.DS_Store
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..e4c9752
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,54 @@
+[workspace]
+members = [
+ "crates/*",
+]
+resolver = "2"
+
+[workspace.package]
+version = "0.2.0"
+edition = "2021"
+license = "Apache-2.0"
+rust-version = "1.75.0"
+
+[workspace.dependencies]
+# arrow
+arrow = { version = "50" }
+arrow-arith = { version = "50" }
+arrow-array = { version = "50", features = ["chrono-tz"]}
+arrow-buffer = { version = "50" }
+arrow-cast = { version = "50" }
+arrow-ipc = { version = "50" }
+arrow-json = { version = "50" }
+arrow-ord = { version = "50" }
+arrow-row = { version = "50" }
+arrow-schema = { version = "50" }
+arrow-select = { version = "50" }
+object_store = { version = "0.9" }
+parquet = { version = "50" }
+
+# datafusion
+datafusion = { version = "35" }
+datafusion-expr = { version = "35" }
+datafusion-common = { version = "35" }
+datafusion-proto = { version = "35" }
+datafusion-sql = { version = "35" }
+datafusion-physical-expr = { version = "35" }
+
+# serde
+serde = { version = "1.0.194", features = ["derive"] }
+serde_json = "1"
+
+# "stdlib"
+bytes = { version = "1" }
+chrono = { version = "=0.4.34", default-features = false, features = ["clock"]
}
+tracing = { version = "0.1", features = ["log"] }
+regex = { version = "1" }
+thiserror = { version = "1" }
+url = { version = "2" }
+uuid = { version = "1" }
+
+# runtime / async
+async-trait = { version = "0.1" }
+futures = { version = "0.3" }
+tokio = { version = "1" }
+num_cpus = { version = "1" }
\ No newline at end of file
diff --git a/README.md b/README.md
index 426b3de..a7f40d5 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,24 @@
# hudi-rs
+
+## Quick Start
+
+### Apache DataFusion
+
+```rust
+use std::sync::Arc;
+
+use datafusion::error::Result;
+use datafusion::prelude::{DataFrame, SessionContext};
+
+use hudi_datafusion::HudiDataSource;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
+ let hudi = HudiDataSource::new("/tmp/trips_table");
+ ctx.register_table("trips_table", Arc::new(hudi))?;
+ let df: DataFrame = ctx.sql("SELECT * from trips_table where fare >
20.0").await?;
+ df.show().await?;
+ Ok(())
+}
+```
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
new file mode 100644
index 0000000..60d2101
--- /dev/null
+++ b/crates/core/Cargo.toml
@@ -0,0 +1,51 @@
+[package]
+name = "hudi-core"
+version = "0.1.0"
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+hudi-fs = { path = "../fs"}
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true , features = ["chrono-tz"]}
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true, features = ["serde"] }
+arrow-select = { workspace = true }
+parquet = { workspace = true, features = [
+ "async",
+ "object_store",
+] }
+pin-project-lite = "^0.2.7"
+
+# datafusion
+datafusion = { workspace = true, optional = true }
+datafusion-expr = { workspace = true, optional = true }
+datafusion-common = { workspace = true, optional = true }
+datafusion-proto = { workspace = true, optional = true }
+datafusion-sql = { workspace = true, optional = true }
+datafusion-physical-expr = { workspace = true, optional = true }
+
+# serde
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
+
+# "stdlib"
+bytes = { workspace = true }
+chrono = { workspace = true, default-features = false, features = ["clock"] }
+hashbrown = "0.14.3"
+regex = { workspace = true }
+thiserror = { workspace = true }
+uuid = { workspace = true, features = ["serde", "v4"] }
+url = { workspace = true }
+
+# test
+tempfile = "3.10.1"
+zip-extract = "0.1.3"
diff --git a/crates/core/fixtures/table/0.x_cow_partitioned.zip
b/crates/core/fixtures/table/0.x_cow_partitioned.zip
new file mode 100644
index 0000000..9f78c06
Binary files /dev/null and b/crates/core/fixtures/table/0.x_cow_partitioned.zip
differ
diff --git
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit.requested
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.inflight
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.inflight
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit.requested
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.inflight
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.inflight
new file mode 100644
index 0000000..e69de29
diff --git
a/crates/core/fixtures/timeline/commits_stub/.hoodie/hoodie.properties
b/crates/core/fixtures/timeline/commits_stub/.hoodie/hoodie.properties
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
new file mode 100644
index 0000000..9a97643
--- /dev/null
+++ b/crates/core/src/error.rs
@@ -0,0 +1,35 @@
+use std::error::Error;
+use std::fmt::Debug;
+use std::io;
+
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum HudiFileGroupError {
+ #[error("Base File {0} has unsupported format: {1}")]
+ UnsupportedBaseFileFormat(String, String),
+ #[error("Commit time {0} is already present in File Group {1}")]
+ CommitTimeAlreadyExists(String, String),
+}
+
+#[derive(Debug, Error)]
+pub enum HudiTimelineError {
+ #[error("Error in reading commit metadata: {0}")]
+ FailToReadCommitMetadata(io::Error),
+}
+
+#[derive(Debug, Error)]
+pub enum HudiFileSystemViewError {
+ #[error("Error in loading partitions: {0}")]
+ FailToLoadPartitions(Box<dyn Error>),
+}
+
+#[derive(Debug, Error)]
+pub enum HudiCoreError {
+ #[error("Failed to load file group")]
+ FailToLoadFileGroup(#[from] HudiFileGroupError),
+ #[error("Failed to init timeline")]
+ FailToInitTimeline(#[from] HudiTimelineError),
+ #[error("Failed to build file system view")]
+ FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
+}
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
new file mode 100644
index 0000000..67af036
--- /dev/null
+++ b/crates/core/src/file_group/mod.rs
@@ -0,0 +1,152 @@
+use std::collections::BTreeMap;
+use std::fmt;
+use std::fmt::Formatter;
+
+use hudi_fs::file_systems::FileMetadata;
+
+use crate::error::HudiFileGroupError;
+use crate::error::HudiFileGroupError::CommitTimeAlreadyExists;
+
+#[derive(Debug)]
+pub struct BaseFile {
+ pub file_group_id: String,
+ pub commit_time: String,
+ pub metadata: Option<FileMetadata>,
+}
+
+impl BaseFile {
+ pub fn new(file_name: &str) -> Self {
+ let (name, _) = file_name.rsplit_once('.').unwrap();
+ let parts: Vec<&str> = name.split('_').collect();
+ let file_group_id = parts[0].to_owned();
+ let commit_time = parts[2].to_owned();
+ Self {
+ file_group_id,
+ commit_time,
+ metadata: None,
+ }
+ }
+
+ pub fn from_file_metadata(file_metadata: FileMetadata) -> Self {
+ let mut base_file = Self::new(file_metadata.name.as_str());
+ base_file.metadata = Some(file_metadata);
+ base_file
+ }
+}
+
+#[derive(Debug)]
+pub struct FileSlice {
+ pub base_file: BaseFile,
+ pub partition_path: Option<String>,
+}
+
+impl FileSlice {
+ pub fn file_path(&self) -> Option<&str> {
+ match &self.base_file.metadata {
+ None => None,
+ Some(file_metadata) => Some(file_metadata.path.as_str()),
+ }
+ }
+
+ pub fn file_group_id(&self) -> &str {
+ &self.base_file.file_group_id
+ }
+
+ pub fn base_instant_time(&self) -> &str {
+ &self.base_file.commit_time
+ }
+
+ pub fn set_base_file(&mut self, base_file: BaseFile) {
+ self.base_file = base_file
+ }
+}
+
+#[derive(Debug)]
+pub struct FileGroup {
+ pub id: String,
+ pub partition_path: Option<String>,
+ pub file_slices: BTreeMap<String, FileSlice>,
+}
+
+impl fmt::Display for FileGroup {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ f.write_str(
+ format!(
+ "File Group: partition {:?} id {}",
+ &self.partition_path, &self.id
+ )
+ .as_str(),
+ )
+ }
+}
+
+impl FileGroup {
+ pub fn new(id: String, partition_path: Option<String>) -> Self {
+ Self {
+ id,
+ partition_path,
+ file_slices: BTreeMap::new(),
+ }
+ }
+
+ pub fn add_base_file_from_name(
+ &mut self,
+ file_name: &str,
+ ) -> Result<&Self, HudiFileGroupError> {
+ let base_file = BaseFile::new(file_name);
+ self.add_base_file(base_file)
+ }
+
+ pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self,
HudiFileGroupError> {
+ let commit_time = base_file.commit_time.as_str();
+ if self.file_slices.contains_key(commit_time) {
+ Err(CommitTimeAlreadyExists(
+ commit_time.to_owned(),
+ self.to_string(),
+ ))
+ } else {
+ self.file_slices.insert(
+ commit_time.to_owned(),
+ FileSlice {
+ partition_path: self.partition_path.clone(),
+ base_file,
+ },
+ );
+ Ok(self)
+ }
+ }
+
+ pub fn get_latest_file_slice(&self) -> Option<&FileSlice> {
+ return self.file_slices.values().next_back();
+ }
+}
+
+#[test]
+fn create_a_base_file_successfully() {
+ let base_file =
+
BaseFile::new("5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet");
+ assert_eq!(
+ base_file.file_group_id,
+ "5a226868-2934-4f84-a16f-55124630c68d-0"
+ );
+ assert_eq!(base_file.commit_time, "20240402144910683");
+}
+
+#[test]
+fn load_a_valid_file_group() {
+ let mut fg =
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+ let _ = fg.add_base_file_from_name(
+
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+ );
+ let _ = fg.add_base_file_from_name(
+
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
+ );
+ assert_eq!(fg.file_slices.len(), 2);
+ assert!(fg.partition_path.is_none());
+ let commit_times: Vec<&str> = fg.file_slices.keys().map(|k|
k.as_str()).collect();
+ assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]);
+ assert_eq!(
+ fg.get_latest_file_slice().unwrap().base_file.commit_time,
+ "20240402144910683"
+ )
+}
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
new file mode 100644
index 0000000..7a1b963
--- /dev/null
+++ b/crates/core/src/lib.rs
@@ -0,0 +1,13 @@
+use crate::table::Table;
+
+mod error;
+mod file_group;
+pub mod table;
+pub type HudiTable = Table;
+mod timeline;
+
+pub const BASE_FILE_EXTENSIONS: [&str; 1] = ["parquet"];
+
+pub fn is_base_file_format_supported(ext: &str) -> bool {
+ BASE_FILE_EXTENSIONS.contains(&ext)
+}
diff --git a/crates/core/src/table/file_system_view.rs
b/crates/core/src/table/file_system_view.rs
new file mode 100644
index 0000000..d0142d1
--- /dev/null
+++ b/crates/core/src/table/file_system_view.rs
@@ -0,0 +1,79 @@
+use crate::error::HudiFileSystemViewError;
+use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
+use crate::file_group::{FileGroup, FileSlice};
+use crate::table::meta_client::MetaClient;
+use hashbrown::HashMap;
+use hudi_fs::test_utils::extract_test_table;
+use std::collections::HashSet;
+use std::path::Path;
+
+pub struct FileSystemView {
+ meta_client: MetaClient,
+ partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
+}
+
+impl FileSystemView {
+ pub fn init(meta_client: MetaClient) -> Result<Self,
HudiFileSystemViewError> {
+ let mut fs_view = FileSystemView {
+ meta_client,
+ partition_to_file_groups: HashMap::new(),
+ };
+ fs_view.load_partitions()?;
+ Ok(fs_view)
+ }
+
+ fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
+ match self.meta_client.get_partition_paths() {
+ Ok(partition_paths) => {
+ for p in partition_paths {
+ match self.meta_client.get_file_groups(p.as_str()) {
+ Ok(file_groups) => {
+ self.partition_to_file_groups.insert(p,
file_groups);
+ }
+ Err(e) => return Err(FailToLoadPartitions(e)),
+ }
+ }
+ }
+ Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
+ }
+ Ok(())
+ }
+
+ pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+ let mut file_slices = Vec::new();
+ for fgs in self.partition_to_file_groups.values() {
+ for fg in fgs {
+ match fg.get_latest_file_slice() {
+ Some(file_slice) => file_slices.push(file_slice.clone()),
+ None => (),
+ }
+ }
+ }
+ file_slices
+ }
+}
+#[test]
+fn meta_client_get_file_groups() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ let fs_view = FileSystemView::init(meta_client).unwrap();
+ let file_slices = fs_view.get_latest_file_slices();
+ assert_eq!(file_slices.len(), 5);
+ let mut fg_ids = Vec::new();
+ for f in file_slices {
+ let fp = f.file_group_id();
+ fg_ids.push(fp);
+ }
+ let actual: HashSet<&str> = fg_ids.into_iter().collect();
+ assert_eq!(
+ actual,
+ HashSet::from_iter(vec![
+ "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+ "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
+ "ee915c68-d7f8-44f6-9759-e691add290d8-0",
+ "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
+ "5a226868-2934-4f84-a16f-55124630c68d-0"
+ ])
+ );
+}
diff --git a/crates/core/src/table/meta_client.rs
b/crates/core/src/table/meta_client.rs
new file mode 100644
index 0000000..c48f662
--- /dev/null
+++ b/crates/core/src/table/meta_client.rs
@@ -0,0 +1,133 @@
+use std::collections::HashSet;
+use std::error::Error;
+use std::path::{Path, PathBuf};
+use std::{fs, io};
+
+use hashbrown::HashMap;
+
+use hudi_fs::file_systems::FileMetadata;
+use hudi_fs::test_utils::extract_test_table;
+
+use crate::file_group::{BaseFile, FileGroup};
+use crate::timeline::Timeline;
+
+#[derive(Debug, Clone)]
+pub struct MetaClient {
+ pub base_path: PathBuf,
+ pub timeline: Timeline,
+}
+
+impl MetaClient {
+ pub fn new(base_path: &Path) -> Self {
+ match Timeline::init(base_path) {
+ Ok(timeline) => Self {
+ base_path: base_path.to_path_buf(),
+ timeline,
+ },
+ Err(e) => panic!("Failed to instantiate meta client: {}", e),
+ }
+ }
+
+ fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
+ let mut leaf_dirs = Vec::new();
+ let mut is_leaf_dir = true;
+ for entry in fs::read_dir(path)? {
+ let entry = entry?;
+ if entry.path().is_dir() {
+ is_leaf_dir = false;
+ let curr_sub_dir = entry.path();
+ let curr = Self::get_leaf_dirs(&curr_sub_dir)?;
+ leaf_dirs.extend(curr);
+ }
+ }
+ if is_leaf_dir {
+ leaf_dirs.push(path.to_path_buf())
+ }
+ Ok(leaf_dirs)
+ }
+
+ pub fn get_partition_paths(&self) -> Result<Vec<String>, io::Error> {
+ let mut first_level_partition_paths: Vec<PathBuf> = Vec::new();
+ for entry in fs::read_dir(self.base_path.as_path())? {
+ let p = entry?.path();
+ if p.is_dir() && p.file_name().and_then(|e| e.to_str()) !=
Some(".hoodie") {
+ first_level_partition_paths.push(p);
+ }
+ }
+ let mut full_partition_paths: Vec<PathBuf> = Vec::new();
+ for p in first_level_partition_paths {
+ full_partition_paths.extend(Self::get_leaf_dirs(p.as_path())?)
+ }
+ let common_prefix_len = self.base_path.to_str().unwrap().len() + 1;
+ let mut partition_paths = Vec::new();
+ for p in full_partition_paths {
+ let full_partition_path = p.to_str().unwrap();
+
partition_paths.push(full_partition_path[common_prefix_len..].to_owned())
+ }
+ Ok(partition_paths)
+ }
+
+ pub fn get_file_groups(&self, partition_path: &str) ->
Result<Vec<FileGroup>, Box<dyn Error>> {
+ let mut part_path = self.base_path.to_path_buf();
+ part_path.push(partition_path);
+ let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> =
HashMap::new();
+ for entry in fs::read_dir(part_path)? {
+ let p = entry?.path();
+ if p.is_file() && p.extension().and_then(|e| e.to_str()) ==
Some("parquet") {
+ let file_metadata = FileMetadata::from_path(p.as_path())?;
+ let base_file = BaseFile::from_file_metadata(file_metadata);
+ let fg_id = &base_file.file_group_id;
+ fg_id_to_base_files
+ .entry(fg_id.to_owned())
+ .or_insert_with(Vec::new)
+ .push(base_file);
+ }
+ }
+ let mut file_groups: Vec<FileGroup> = Vec::new();
+ for (fg_id, base_files) in fg_id_to_base_files.into_iter() {
+ let mut fg = FileGroup::new(fg_id.to_owned(),
Some(partition_path.to_owned()));
+ for bf in base_files {
+ fg.add_base_file(bf)?;
+ }
+ file_groups.push(fg);
+ }
+ Ok(file_groups)
+ }
+}
+
+#[test]
+fn meta_client_get_partition_paths() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ let partition_paths = meta_client.get_partition_paths().unwrap();
+ assert_eq!(
+ partition_paths,
+ vec!["chennai", "sao_paulo", "san_francisco"]
+ )
+}
+
+#[test]
+fn meta_client_get_file_groups() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ let file_groups = meta_client.get_file_groups("san_francisco").unwrap();
+ assert_eq!(file_groups.len(), 3);
+ let fg_ids: HashSet<&str> = HashSet::from_iter(file_groups.iter().map(|fg|
fg.id.as_str()));
+ assert_eq!(
+ fg_ids,
+ HashSet::from_iter(vec![
+ "5a226868-2934-4f84-a16f-55124630c68d-0",
+ "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+ "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
+ ])
+ );
+}
+#[test]
+fn meta_client_active_timeline_init_as_expected() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let meta_client = MetaClient::new(&target_table_path);
+ assert_eq!(meta_client.timeline.instants.len(), 2)
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
new file mode 100644
index 0000000..e3d7bab
--- /dev/null
+++ b/crates/core/src/table/mod.rs
@@ -0,0 +1,59 @@
+use std::error::Error;
+use std::path::{Path, PathBuf};
+
+use arrow_schema::SchemaRef;
+
+use hudi_fs::test_utils::extract_test_table;
+
+use crate::table::file_system_view::FileSystemView;
+use crate::table::meta_client::MetaClient;
+
+mod file_system_view;
+mod meta_client;
+
+#[derive(Debug, Clone)]
+pub struct Table {
+ pub base_path: PathBuf,
+ meta_client: MetaClient,
+}
+
+impl Table {
+ pub fn new(base_path: &str) -> Self {
+ let p = PathBuf::from(base_path);
+ let meta_client = MetaClient::new(p.as_path());
+ Self {
+ base_path: p,
+ meta_client,
+ }
+ }
+
+ pub fn schema(&self) -> SchemaRef {
+ match self.meta_client.timeline.get_latest_schema() {
+ Ok(table_schema) => SchemaRef::from(table_schema),
+ Err(e) => {
+ panic!("Failed to resolve table schema: {}", e)
+ }
+ }
+ }
+
+ pub fn get_snapshot_file_paths(&self) -> Result<Vec<String>, Box<dyn
Error>> {
+ let meta_client = MetaClient::new(&self.base_path);
+ let fs_view = FileSystemView::init(meta_client)?;
+ let mut file_paths = Vec::new();
+ for f in fs_view.get_latest_file_slices() {
+ if let Some(f) = f.file_path() {
+ file_paths.push(f.to_string());
+ }
+ }
+ Ok(file_paths)
+ }
+}
+
+#[test]
+fn load_snapshot_file_paths() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let hudi_table = Table::new(target_table_path.as_path().to_str().unwrap());
+ assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
+ println!("{}", hudi_table.schema().to_string());
+}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
new file mode 100644
index 0000000..86e9262
--- /dev/null
+++ b/crates/core/src/timeline/mod.rs
@@ -0,0 +1,146 @@
+use hudi_fs::file_name_without_ext;
+use std::collections::HashMap;
+
+use arrow_schema::SchemaRef;
+use hudi_fs::test_utils::extract_test_table;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+use parquet::file::serialized_reader::SerializedFileReader;
+use serde::de::Error;
+use serde_json::Value;
+use std::fs::File;
+use std::io::{ErrorKind, Read};
+use std::path::{Path, PathBuf};
+use std::{fs, io};
+
+use crate::error::HudiTimelineError;
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum State {
+ REQUESTED,
+ INFLIGHT,
+ COMPLETED,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct Instant {
+ state: State,
+ action: String,
+ timestamp: String,
+}
+
+impl Instant {
+ pub fn state_suffix(&self) -> String {
+ match self.state {
+ State::REQUESTED => ".requested".to_owned(),
+ State::INFLIGHT => ".inflight".to_owned(),
+ State::COMPLETED => "".to_owned(),
+ }
+ }
+
+ pub fn file_name(&self) -> String {
+ format!("{}.{}{}", self.timestamp, self.action, self.state_suffix())
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Timeline {
+ pub base_path: PathBuf,
+ pub instants: Vec<Instant>,
+}
+
+impl Timeline {
+ pub fn init(base_path: &Path) -> Result<Self, io::Error> {
+ let instants = Self::load_completed_commit_instants(base_path)?;
+ Ok(Self {
+ base_path: base_path.to_path_buf(),
+ instants,
+ })
+ }
+
+ fn load_completed_commit_instants(base_path: &Path) ->
Result<Vec<Instant>, io::Error> {
+ let mut completed_commits = Vec::new();
+ let mut timeline_path = base_path.to_path_buf();
+ timeline_path.push(".hoodie");
+ for entry in fs::read_dir(timeline_path)? {
+ let p = entry?.path();
+ if p.is_file() && p.extension().and_then(|e| e.to_str()) ==
Some("commit") {
+ completed_commits.push(Instant {
+ state: State::COMPLETED,
+ timestamp: file_name_without_ext(p.file_name()),
+ action: "commit".to_owned(),
+ })
+ }
+ }
+ // TODO: encapsulate sorting within Instant
+ completed_commits.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
+ Ok(completed_commits)
+ }
+
+ pub fn get_latest_commit_metadata(&self) -> Result<HashMap<String, Value>,
io::Error> {
+ match self.instants.iter().next_back() {
+ Some(instant) => {
+ let mut latest_instant_file_path =
self.base_path.to_path_buf();
+ latest_instant_file_path.push(".hoodie");
+ latest_instant_file_path.push(instant.file_name());
+ let mut f = File::open(latest_instant_file_path)?;
+ let mut content = String::new();
+ f.read_to_string(&mut content)?;
+ let commit_metadata = serde_json::from_str(&content)?;
+ Ok(commit_metadata)
+ }
+ None => return Ok(HashMap::new()),
+ }
+ }
+
+ pub fn get_latest_schema(&self) -> Result<SchemaRef, io::Error> {
+ let commit_metadata = self.get_latest_commit_metadata()?;
+ if let Some(partitionToWriteStats) =
commit_metadata["partitionToWriteStats"].as_object() {
+ for (_key, value) in partitionToWriteStats {
+ if let Some(first_value) = value.as_array().and_then(|arr|
arr.get(0)) {
+ if let Some(path) = first_value["path"].as_str() {
+ let mut base_file_path =
PathBuf::from(&self.base_path);
+ base_file_path.push(path);
+ let file = File::open(base_file_path)?;
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file)?;
+ return Ok(builder.schema().to_owned());
+ }
+ }
+ break;
+ }
+ }
+ Err(io::Error::new(
+ ErrorKind::InvalidData,
+ "Failed to resolve schema.",
+ ))
+ }
+}
+
+#[test]
+fn init_commits_timeline() {
+ let fixture_path = Path::new("fixtures/timeline/commits_stub");
+ let timeline = Timeline::init(fixture_path).unwrap();
+ assert_eq!(
+ timeline.instants,
+ vec![
+ Instant {
+ state: State::COMPLETED,
+ action: "commit".to_owned(),
+ timestamp: "20240402123035233".to_owned(),
+ },
+ Instant {
+ state: State::COMPLETED,
+ action: "commit".to_owned(),
+ timestamp: "20240402144910683".to_owned(),
+ },
+ ]
+ )
+}
+
+#[test]
+fn read_latest_schema() {
+ let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+ let target_table_path = extract_test_table(fixture_path);
+ let timeline = Timeline::init(target_table_path.as_path()).unwrap();
+ let table_schema = timeline.get_latest_schema().unwrap();
+ assert_eq!(table_schema.fields.len(), 11)
+}
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
new file mode 100644
index 0000000..ac15c78
--- /dev/null
+++ b/crates/datafusion/Cargo.toml
@@ -0,0 +1,56 @@
+[package]
+name = "hudi-datafusion"
+version = "0.1.0"
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+hudi-core = { path = "../core"}
+hudi-fs = { path = "../fs" }
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true , features = ["chrono-tz"]}
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true, features = ["serde"] }
+arrow-select = { workspace = true }
+parquet = { workspace = true, features = [
+ "async",
+ "object_store",
+] }
+pin-project-lite = "^0.2.7"
+
+# datafusion
+datafusion = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-proto = { workspace = true }
+datafusion-sql = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+
+# serde
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
+
+# async
+tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros"] }
+
+# "stdlib"
+bytes = { workspace = true }
+chrono = { workspace = true, default-features = false, features = ["clock"] }
+hashbrown = "0.14.3"
+regex = { workspace = true }
+thiserror = { workspace = true }
+uuid = { workspace = true, features = ["serde", "v4"] }
+url = { workspace = true }
+
+# test
+tempfile = "3.10.1"
+zip-extract = "0.1.3"
+async-trait = "0.1.79"
diff --git a/crates/datafusion/src/bin/main.rs
b/crates/datafusion/src/bin/main.rs
new file mode 100644
index 0000000..5ca51ed
--- /dev/null
+++ b/crates/datafusion/src/bin/main.rs
@@ -0,0 +1,16 @@
+use std::sync::Arc;
+
+use datafusion::error::Result;
+use datafusion::prelude::{DataFrame, SessionContext};
+
+use hudi_datafusion::HudiDataSource;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let ctx = SessionContext::new();
+ let hudi = HudiDataSource::new("/tmp/trips_table");
+ ctx.register_table("trips_table", Arc::new(hudi))?;
+ let df: DataFrame = ctx.sql("SELECT * from trips_table where fare >
20.0").await?;
+ df.show().await?;
+ Ok(())
+}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
new file mode 100644
index 0000000..4b6e981
--- /dev/null
+++ b/crates/datafusion/src/lib.rs
@@ -0,0 +1,154 @@
+use arrow_array::RecordBatch;
+use std::any::Any;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::path::Path;
+use std::sync::{Arc, Mutex};
+
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion;
+use datafusion::datasource::TableProvider;
+use datafusion::execution::context::SessionState;
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::physical_plan::memory::MemoryStream;
+use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+use datafusion::prelude::{DataFrame, SessionContext};
+use datafusion_common;
+use datafusion_common::{project_schema, DataFusionError};
+use datafusion_expr;
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_expr;
+use datafusion_physical_expr::PhysicalSortExpr;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+
+use hudi_core::HudiTable;
+use hudi_fs::test_utils::extract_test_table;
+
+#[derive(Debug, Clone)]
+pub struct HudiDataSource {
+ table: HudiTable,
+}
+
+impl HudiDataSource {
+ pub fn new(base_path: &str) -> Self {
+ let table = HudiTable::new(base_path);
+ Self { table }
+ }
+ pub(crate) async fn create_physical_plan(
+ &self,
+ projections: Option<&Vec<usize>>,
+ schema: SchemaRef,
+ ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(HudiExec::new(projections, schema, self.clone())))
+ }
+
+ fn get_record_batches(&self) ->
datafusion_common::Result<Vec<RecordBatch>> {
+ match self.table.get_snapshot_file_paths() {
+ Ok(file_paths) => {
+ let mut record_batches = Vec::new();
+ for f in file_paths {
+ let file = File::open(f)?;
+ let builder =
ParquetRecordBatchReaderBuilder::try_new(file)?;
+ let mut reader = builder.build()?;
+ if let Ok(Some(result)) = reader.next().transpose() {
+ record_batches.push(result)
+ }
+ }
+ Ok(record_batches)
+ }
+ Err(e) => Err(DataFusionError::Execution(
+ "Failed to read records from table.".to_owned(),
+ )),
+ }
+ }
+}
+
+#[async_trait]
+impl TableProvider for HudiDataSource {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.table.schema()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ state: &SessionState,
+ projection: Option<&Vec<usize>>,
+ filters: &[Expr],
+ limit: Option<usize>,
+ ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ return self.create_physical_plan(projection, self.schema()).await;
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct HudiExec {
+ data_source: HudiDataSource,
+ projected_schema: SchemaRef,
+}
+
+impl HudiExec {
+ fn new(
+ projections: Option<&Vec<usize>>,
+ schema: SchemaRef,
+ data_source: HudiDataSource,
+ ) -> Self {
+ let projected_schema = project_schema(&schema, projections).unwrap();
+ Self {
+ data_source,
+ projected_schema,
+ }
+ }
+}
+
+impl DisplayAs for HudiExec {
+ fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) ->
std::fmt::Result {
+ write!(f, "HudiExec")
+ }
+}
+
+impl ExecutionPlan for HudiExec {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.projected_schema.clone()
+ }
+
+ fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
+ datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ Ok(self)
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> datafusion_common::Result<SendableRecordBatchStream> {
+ let data = self.data_source.get_record_batches()?;
+ Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
+ }
+}
diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml
new file mode 100644
index 0000000..7c4121b
--- /dev/null
+++ b/crates/fs/Cargo.toml
@@ -0,0 +1,50 @@
+[package]
+name = "hudi-fs"
+version = "0.1.0"
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true , features = ["chrono-tz"]}
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true, features = ["serde"] }
+arrow-select = { workspace = true }
+parquet = { workspace = true, features = [
+ "async",
+ "object_store",
+] }
+pin-project-lite = "^0.2.7"
+
+# datafusion
+datafusion = { workspace = true, optional = true }
+datafusion-expr = { workspace = true, optional = true }
+datafusion-common = { workspace = true, optional = true }
+datafusion-proto = { workspace = true, optional = true }
+datafusion-sql = { workspace = true, optional = true }
+datafusion-physical-expr = { workspace = true, optional = true }
+
+# serde
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
+
+# "stdlib"
+bytes = { workspace = true }
+chrono = { workspace = true, default-features = false, features = ["clock"] }
+hashbrown = "0.14.3"
+regex = { workspace = true }
+thiserror = { workspace = true }
+uuid = { workspace = true, features = ["serde", "v4"] }
+url = { workspace = true }
+
+# test
+tempfile = "3.10.1"
+zip-extract = "0.1.3"
diff --git a/crates/fs/fixtures/a.parquet b/crates/fs/fixtures/a.parquet
new file mode 100644
index 0000000..d99765e
Binary files /dev/null and b/crates/fs/fixtures/a.parquet differ
diff --git a/crates/fs/src/file_systems.rs b/crates/fs/src/file_systems.rs
new file mode 100644
index 0000000..fa5a528
--- /dev/null
+++ b/crates/fs/src/file_systems.rs
@@ -0,0 +1,38 @@
+use std::error::Error;
+use std::{fs::File, path::Path};
+
+use parquet::file::reader::{FileReader, Length, SerializedFileReader};
+
+use crate::assert_approx_eq;
+
+#[derive(Debug)]
+pub struct FileMetadata {
+ pub path: String,
+ pub name: String,
+ pub size: u64,
+ pub num_records: i64,
+}
+
+impl FileMetadata {
+ pub fn from_path(p: &Path) -> Result<Self, Box<dyn Error>> {
+ let file = File::open(p)?;
+ let reader = SerializedFileReader::new(file).unwrap();
+ let num_records = reader.metadata().file_metadata().num_rows();
+ Ok(Self {
+ path: p.to_str().unwrap().to_string(),
+ name: p.file_name().unwrap().to_os_string().into_string().unwrap(),
+ size: p.metadata().unwrap().len(),
+ num_records,
+ })
+ }
+}
+
+#[test]
+fn read_file_metadata() {
+ let fixture_path = Path::new("fixtures/a.parquet");
+ let fm = FileMetadata::from_path(fixture_path).unwrap();
+ assert_eq!(fm.path, "fixtures/a.parquet");
+ assert_eq!(fm.name, "a.parquet");
+ assert_approx_eq!(fm.size, 866, 20);
+ assert_eq!(fm.num_records, 5);
+}
diff --git a/crates/fs/src/lib.rs b/crates/fs/src/lib.rs
new file mode 100644
index 0000000..c299855
--- /dev/null
+++ b/crates/fs/src/lib.rs
@@ -0,0 +1,30 @@
+use std::ffi::OsStr;
+
+pub mod file_systems;
+pub mod test_utils;
+
+#[macro_export]
+macro_rules! assert_approx_eq {
+ ($a:expr, $b:expr, $delta:expr) => {{
+ let a = $a;
+ let b = $b;
+ let delta = $delta;
+ let diff = if a > b { a - b } else { b - a };
+
+ assert!(
+ diff <= delta,
+ "assertion failed: `(left ≈ right)`\n left: `{:?}`,\n right:
`{:?}`,\n delta: `{:?}`",
+ a, b, delta
+ );
+ }};
+}
+
+pub fn file_name_without_ext(file_name: Option<&OsStr>) -> String {
+ return file_name
+ .and_then(|e| e.to_str())
+ .unwrap()
+ .rsplit_once('.')
+ .unwrap()
+ .0
+ .to_owned();
+}
diff --git a/crates/fs/src/test_utils.rs b/crates/fs/src/test_utils.rs
new file mode 100644
index 0000000..8d43d67
--- /dev/null
+++ b/crates/fs/src/test_utils.rs
@@ -0,0 +1,12 @@
+use std::fs;
+use std::io::Cursor;
+use std::path::{Path, PathBuf};
+
+use tempfile::tempdir;
+
+pub fn extract_test_table(fixture_path: &Path) -> PathBuf {
+ let target_dir = tempdir().unwrap().path().to_path_buf();
+ let archive = fs::read(fixture_path).unwrap();
+ zip_extract::extract(Cursor::new(archive), &target_dir, true).unwrap();
+ target_dir
+}
diff --git a/crates/hudi/Cargo.toml b/crates/hudi/Cargo.toml
new file mode 100644
index 0000000..cef563c
--- /dev/null
+++ b/crates/hudi/Cargo.toml
@@ -0,0 +1,11 @@
+[package]
+name = "hudi"
+version.workspace = true
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+hudi-core = { path = "../core"}
diff --git a/crates/hudi/src/lib.rs b/crates/hudi/src/lib.rs
new file mode 100644
index 0000000..5b71c7e
--- /dev/null
+++ b/crates/hudi/src/lib.rs
@@ -0,0 +1 @@
+pub use hudi_core::*;