This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0236839  feat: initial rust implementation to integrate with 
datafusion (#1)
0236839 is described below

commit 0236839b87f9679f546402a67535d689634d546b
Author: Shiyan Xu <[email protected]>
AuthorDate: Fri May 3 18:24:28 2024 -0500

    feat: initial rust implementation to integrate with datafusion (#1)
    
    Implement basic core Hudi models and `HudiDataSource` for 
https://github.com/apache/datafusion
---
 .gitignore                                         |  15 ++
 Cargo.toml                                         |  54 ++++++++
 README.md                                          |  23 +++
 crates/core/Cargo.toml                             |  51 +++++++
 crates/core/fixtures/table/0.x_cow_partitioned.zip | Bin 0 -> 46757 bytes
 .../commits_stub/.hoodie/20240402123035233.commit  |   0
 .../.hoodie/20240402123035233.commit.requested     |   0
 .../.hoodie/20240402123035233.inflight             |   0
 .../commits_stub/.hoodie/20240402144910683.commit  |   0
 .../.hoodie/20240402144910683.commit.requested     |   0
 .../.hoodie/20240402144910683.inflight             |   0
 .../commits_stub/.hoodie/hoodie.properties         |   0
 crates/core/src/error.rs                           |  35 +++++
 crates/core/src/file_group/mod.rs                  | 152 ++++++++++++++++++++
 crates/core/src/lib.rs                             |  13 ++
 crates/core/src/table/file_system_view.rs          |  79 +++++++++++
 crates/core/src/table/meta_client.rs               | 133 ++++++++++++++++++
 crates/core/src/table/mod.rs                       |  59 ++++++++
 crates/core/src/timeline/mod.rs                    | 146 +++++++++++++++++++
 crates/datafusion/Cargo.toml                       |  56 ++++++++
 crates/datafusion/src/bin/main.rs                  |  16 +++
 crates/datafusion/src/lib.rs                       | 154 +++++++++++++++++++++
 crates/fs/Cargo.toml                               |  50 +++++++
 crates/fs/fixtures/a.parquet                       | Bin 0 -> 866 bytes
 crates/fs/src/file_systems.rs                      |  38 +++++
 crates/fs/src/lib.rs                               |  30 ++++
 crates/fs/src/test_utils.rs                        |  12 ++
 crates/hudi/Cargo.toml                             |  11 ++
 crates/hudi/src/lib.rs                             |   1 +
 29 files changed, 1128 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..040a12d
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,15 @@
+/Cargo.lock
+/target
+**/target
+
+/.idea
+.vscode
+
+# python
+.env
+venv
+**/.python-version
+__pycache__
+
+# macOS
+**/.DS_Store
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000..e4c9752
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,54 @@
+[workspace]
+members = [
+    "crates/*",
+]
+resolver = "2"
+
+[workspace.package]
+version = "0.2.0"
+edition = "2021"
+license = "Apache-2.0"
+rust-version = "1.75.0"
+
+[workspace.dependencies]
+# arrow
+arrow = { version = "50" }
+arrow-arith = { version = "50" }
+arrow-array = { version = "50", features = ["chrono-tz"]}
+arrow-buffer = { version = "50" }
+arrow-cast = { version = "50" }
+arrow-ipc = { version = "50" }
+arrow-json = { version = "50" }
+arrow-ord = { version = "50" }
+arrow-row = { version = "50" }
+arrow-schema = { version = "50" }
+arrow-select = { version = "50" }
+object_store = { version = "0.9" }
+parquet = { version = "50" }
+
+# datafusion
+datafusion = { version = "35" }
+datafusion-expr = { version = "35" }
+datafusion-common = { version = "35" }
+datafusion-proto = { version = "35" }
+datafusion-sql = { version = "35" }
+datafusion-physical-expr = { version = "35" }
+
+# serde
+serde = { version = "1.0.194", features = ["derive"] }
+serde_json = "1"
+
+# "stdlib"
+bytes = { version = "1" }
+chrono = { version = "=0.4.34", default-features = false, features = ["clock"] 
}
+tracing = { version = "0.1", features = ["log"] }
+regex = { version = "1" }
+thiserror = { version = "1" }
+url = { version = "2" }
+uuid = { version = "1" }
+
+# runtime / async
+async-trait = { version = "0.1" }
+futures = { version = "0.3" }
+tokio = { version = "1" }
+num_cpus = { version = "1" }
\ No newline at end of file
diff --git a/README.md b/README.md
index 426b3de..a7f40d5 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,24 @@
 # hudi-rs
+
+## Quick Start
+
+### Apache DataFusion
+
+```rust
+use std::sync::Arc;
+
+use datafusion::error::Result;
+use datafusion::prelude::{DataFrame, SessionContext};
+
+use hudi_datafusion::HudiDataSource;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let ctx = SessionContext::new();
+    let hudi = HudiDataSource::new("/tmp/trips_table");
+    ctx.register_table("trips_table", Arc::new(hudi))?;
+    let df: DataFrame = ctx.sql("SELECT * from trips_table where fare > 
20.0").await?;
+    df.show().await?;
+    Ok(())
+}
+```
diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml
new file mode 100644
index 0000000..60d2101
--- /dev/null
+++ b/crates/core/Cargo.toml
@@ -0,0 +1,51 @@
+[package]
+name = "hudi-core"
+version = "0.1.0"
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+hudi-fs = { path = "../fs"}
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true , features = ["chrono-tz"]}
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true, features = ["serde"] }
+arrow-select = { workspace = true }
+parquet = { workspace = true, features = [
+    "async",
+    "object_store",
+] }
+pin-project-lite = "^0.2.7"
+
+# datafusion
+datafusion = { workspace = true, optional = true }
+datafusion-expr = { workspace = true, optional = true }
+datafusion-common = { workspace = true, optional = true }
+datafusion-proto = { workspace = true, optional = true }
+datafusion-sql = { workspace = true, optional = true }
+datafusion-physical-expr = { workspace = true, optional = true }
+
+# serde
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
+
+# "stdlib"
+bytes = { workspace = true }
+chrono = { workspace = true, default-features = false, features = ["clock"] }
+hashbrown = "0.14.3"
+regex = { workspace = true }
+thiserror = { workspace = true }
+uuid = { workspace = true, features = ["serde", "v4"] }
+url = { workspace = true }
+
+# test
+tempfile = "3.10.1"
+zip-extract = "0.1.3"
diff --git a/crates/core/fixtures/table/0.x_cow_partitioned.zip 
b/crates/core/fixtures/table/0.x_cow_partitioned.zip
new file mode 100644
index 0000000..9f78c06
Binary files /dev/null and b/crates/core/fixtures/table/0.x_cow_partitioned.zip 
differ
diff --git 
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit 
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit.requested
 
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.inflight 
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402123035233.inflight
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit 
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit.requested
 
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.commit.requested
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.inflight 
b/crates/core/fixtures/timeline/commits_stub/.hoodie/20240402144910683.inflight
new file mode 100644
index 0000000..e69de29
diff --git 
a/crates/core/fixtures/timeline/commits_stub/.hoodie/hoodie.properties 
b/crates/core/fixtures/timeline/commits_stub/.hoodie/hoodie.properties
new file mode 100644
index 0000000..e69de29
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
new file mode 100644
index 0000000..9a97643
--- /dev/null
+++ b/crates/core/src/error.rs
@@ -0,0 +1,35 @@
+use std::error::Error;
+use std::fmt::Debug;
+use std::io;
+
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum HudiFileGroupError {
+    #[error("Base File {0} has unsupported format: {1}")]
+    UnsupportedBaseFileFormat(String, String),
+    #[error("Commit time {0} is already present in File Group {1}")]
+    CommitTimeAlreadyExists(String, String),
+}
+
+#[derive(Debug, Error)]
+pub enum HudiTimelineError {
+    #[error("Error in reading commit metadata: {0}")]
+    FailToReadCommitMetadata(io::Error),
+}
+
+#[derive(Debug, Error)]
+pub enum HudiFileSystemViewError {
+    #[error("Error in loading partitions: {0}")]
+    FailToLoadPartitions(Box<dyn Error>),
+}
+
+#[derive(Debug, Error)]
+pub enum HudiCoreError {
+    #[error("Failed to load file group")]
+    FailToLoadFileGroup(#[from] HudiFileGroupError),
+    #[error("Failed to init timeline")]
+    FailToInitTimeline(#[from] HudiTimelineError),
+    #[error("Failed to build file system view")]
+    FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
+}
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
new file mode 100644
index 0000000..67af036
--- /dev/null
+++ b/crates/core/src/file_group/mod.rs
@@ -0,0 +1,152 @@
+use std::collections::BTreeMap;
+use std::fmt;
+use std::fmt::Formatter;
+
+use hudi_fs::file_systems::FileMetadata;
+
+use crate::error::HudiFileGroupError;
+use crate::error::HudiFileGroupError::CommitTimeAlreadyExists;
+
+#[derive(Debug)]
+pub struct BaseFile {
+    pub file_group_id: String,
+    pub commit_time: String,
+    pub metadata: Option<FileMetadata>,
+}
+
+impl BaseFile {
+    pub fn new(file_name: &str) -> Self {
+        let (name, _) = file_name.rsplit_once('.').unwrap();
+        let parts: Vec<&str> = name.split('_').collect();
+        let file_group_id = parts[0].to_owned();
+        let commit_time = parts[2].to_owned();
+        Self {
+            file_group_id,
+            commit_time,
+            metadata: None,
+        }
+    }
+
+    pub fn from_file_metadata(file_metadata: FileMetadata) -> Self {
+        let mut base_file = Self::new(file_metadata.name.as_str());
+        base_file.metadata = Some(file_metadata);
+        base_file
+    }
+}
+
+#[derive(Debug)]
+pub struct FileSlice {
+    pub base_file: BaseFile,
+    pub partition_path: Option<String>,
+}
+
+impl FileSlice {
+    pub fn file_path(&self) -> Option<&str> {
+        match &self.base_file.metadata {
+            None => None,
+            Some(file_metadata) => Some(file_metadata.path.as_str()),
+        }
+    }
+
+    pub fn file_group_id(&self) -> &str {
+        &self.base_file.file_group_id
+    }
+
+    pub fn base_instant_time(&self) -> &str {
+        &self.base_file.commit_time
+    }
+
+    pub fn set_base_file(&mut self, base_file: BaseFile) {
+        self.base_file = base_file
+    }
+}
+
+#[derive(Debug)]
+pub struct FileGroup {
+    pub id: String,
+    pub partition_path: Option<String>,
+    pub file_slices: BTreeMap<String, FileSlice>,
+}
+
+impl fmt::Display for FileGroup {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        f.write_str(
+            format!(
+                "File Group: partition {:?} id {}",
+                &self.partition_path, &self.id
+            )
+            .as_str(),
+        )
+    }
+}
+
+impl FileGroup {
+    pub fn new(id: String, partition_path: Option<String>) -> Self {
+        Self {
+            id,
+            partition_path,
+            file_slices: BTreeMap::new(),
+        }
+    }
+
+    pub fn add_base_file_from_name(
+        &mut self,
+        file_name: &str,
+    ) -> Result<&Self, HudiFileGroupError> {
+        let base_file = BaseFile::new(file_name);
+        self.add_base_file(base_file)
+    }
+
+    pub fn add_base_file(&mut self, base_file: BaseFile) -> Result<&Self, 
HudiFileGroupError> {
+        let commit_time = base_file.commit_time.as_str();
+        if self.file_slices.contains_key(commit_time) {
+            Err(CommitTimeAlreadyExists(
+                commit_time.to_owned(),
+                self.to_string(),
+            ))
+        } else {
+            self.file_slices.insert(
+                commit_time.to_owned(),
+                FileSlice {
+                    partition_path: self.partition_path.clone(),
+                    base_file,
+                },
+            );
+            Ok(self)
+        }
+    }
+
+    pub fn get_latest_file_slice(&self) -> Option<&FileSlice> {
+        return self.file_slices.values().next_back();
+    }
+}
+
+#[test]
+fn create_a_base_file_successfully() {
+    let base_file =
+        
BaseFile::new("5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet");
+    assert_eq!(
+        base_file.file_group_id,
+        "5a226868-2934-4f84-a16f-55124630c68d-0"
+    );
+    assert_eq!(base_file.commit_time, "20240402144910683");
+}
+
+#[test]
+fn load_a_valid_file_group() {
+    let mut fg = 
FileGroup::new("5a226868-2934-4f84-a16f-55124630c68d-0".to_owned(), None);
+    let _ = fg.add_base_file_from_name(
+        
"5a226868-2934-4f84-a16f-55124630c68d-0_0-7-24_20240402144910683.parquet",
+    );
+    let _ = fg.add_base_file_from_name(
+        
"5a226868-2934-4f84-a16f-55124630c68d-0_2-10-0_20240402123035233.parquet",
+    );
+    assert_eq!(fg.file_slices.len(), 2);
+    assert!(fg.partition_path.is_none());
+    let commit_times: Vec<&str> = fg.file_slices.keys().map(|k| 
k.as_str()).collect();
+    assert_eq!(commit_times, vec!["20240402123035233", "20240402144910683"]);
+    assert_eq!(
+        fg.get_latest_file_slice().unwrap().base_file.commit_time,
+        "20240402144910683"
+    )
+}
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
new file mode 100644
index 0000000..7a1b963
--- /dev/null
+++ b/crates/core/src/lib.rs
@@ -0,0 +1,13 @@
+use crate::table::Table;
+
+mod error;
+mod file_group;
+pub mod table;
+pub type HudiTable = Table;
+mod timeline;
+
+pub const BASE_FILE_EXTENSIONS: [&str; 1] = ["parquet"];
+
+pub fn is_base_file_format_supported(ext: &str) -> bool {
+    BASE_FILE_EXTENSIONS.contains(&ext)
+}
diff --git a/crates/core/src/table/file_system_view.rs 
b/crates/core/src/table/file_system_view.rs
new file mode 100644
index 0000000..d0142d1
--- /dev/null
+++ b/crates/core/src/table/file_system_view.rs
@@ -0,0 +1,79 @@
+use crate::error::HudiFileSystemViewError;
+use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
+use crate::file_group::{FileGroup, FileSlice};
+use crate::table::meta_client::MetaClient;
+use hashbrown::HashMap;
+use hudi_fs::test_utils::extract_test_table;
+use std::collections::HashSet;
+use std::path::Path;
+
+pub struct FileSystemView {
+    meta_client: MetaClient,
+    partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
+}
+
+impl FileSystemView {
+    pub fn init(meta_client: MetaClient) -> Result<Self, 
HudiFileSystemViewError> {
+        let mut fs_view = FileSystemView {
+            meta_client,
+            partition_to_file_groups: HashMap::new(),
+        };
+        fs_view.load_partitions()?;
+        Ok(fs_view)
+    }
+
+    fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
+        match self.meta_client.get_partition_paths() {
+            Ok(partition_paths) => {
+                for p in partition_paths {
+                    match self.meta_client.get_file_groups(p.as_str()) {
+                        Ok(file_groups) => {
+                            self.partition_to_file_groups.insert(p, 
file_groups);
+                        }
+                        Err(e) => return Err(FailToLoadPartitions(e)),
+                    }
+                }
+            }
+            Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
+        }
+        Ok(())
+    }
+
+    pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+        let mut file_slices = Vec::new();
+        for fgs in self.partition_to_file_groups.values() {
+            for fg in fgs {
+                match fg.get_latest_file_slice() {
+                    Some(file_slice) => file_slices.push(file_slice.clone()),
+                    None => (),
+                }
+            }
+        }
+        file_slices
+    }
+}
+#[test]
+fn meta_client_get_file_groups() {
+    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+    let target_table_path = extract_test_table(fixture_path);
+    let meta_client = MetaClient::new(&target_table_path);
+    let fs_view = FileSystemView::init(meta_client).unwrap();
+    let file_slices = fs_view.get_latest_file_slices();
+    assert_eq!(file_slices.len(), 5);
+    let mut fg_ids = Vec::new();
+    for f in file_slices {
+        let fp = f.file_group_id();
+        fg_ids.push(fp);
+    }
+    let actual: HashSet<&str> = fg_ids.into_iter().collect();
+    assert_eq!(
+        actual,
+        HashSet::from_iter(vec![
+            "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+            "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
+            "ee915c68-d7f8-44f6-9759-e691add290d8-0",
+            "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
+            "5a226868-2934-4f84-a16f-55124630c68d-0"
+        ])
+    );
+}
diff --git a/crates/core/src/table/meta_client.rs 
b/crates/core/src/table/meta_client.rs
new file mode 100644
index 0000000..c48f662
--- /dev/null
+++ b/crates/core/src/table/meta_client.rs
@@ -0,0 +1,133 @@
+use std::collections::HashSet;
+use std::error::Error;
+use std::path::{Path, PathBuf};
+use std::{fs, io};
+
+use hashbrown::HashMap;
+
+use hudi_fs::file_systems::FileMetadata;
+use hudi_fs::test_utils::extract_test_table;
+
+use crate::file_group::{BaseFile, FileGroup};
+use crate::timeline::Timeline;
+
+#[derive(Debug, Clone)]
+pub struct MetaClient {
+    pub base_path: PathBuf,
+    pub timeline: Timeline,
+}
+
+impl MetaClient {
+    pub fn new(base_path: &Path) -> Self {
+        match Timeline::init(base_path) {
+            Ok(timeline) => Self {
+                base_path: base_path.to_path_buf(),
+                timeline,
+            },
+            Err(e) => panic!("Failed to instantiate meta client: {}", e),
+        }
+    }
+
+    fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
+        let mut leaf_dirs = Vec::new();
+        let mut is_leaf_dir = true;
+        for entry in fs::read_dir(path)? {
+            let entry = entry?;
+            if entry.path().is_dir() {
+                is_leaf_dir = false;
+                let curr_sub_dir = entry.path();
+                let curr = Self::get_leaf_dirs(&curr_sub_dir)?;
+                leaf_dirs.extend(curr);
+            }
+        }
+        if is_leaf_dir {
+            leaf_dirs.push(path.to_path_buf())
+        }
+        Ok(leaf_dirs)
+    }
+
+    pub fn get_partition_paths(&self) -> Result<Vec<String>, io::Error> {
+        let mut first_level_partition_paths: Vec<PathBuf> = Vec::new();
+        for entry in fs::read_dir(self.base_path.as_path())? {
+            let p = entry?.path();
+            if p.is_dir() && p.file_name().and_then(|e| e.to_str()) != 
Some(".hoodie") {
+                first_level_partition_paths.push(p);
+            }
+        }
+        let mut full_partition_paths: Vec<PathBuf> = Vec::new();
+        for p in first_level_partition_paths {
+            full_partition_paths.extend(Self::get_leaf_dirs(p.as_path())?)
+        }
+        let common_prefix_len = self.base_path.to_str().unwrap().len() + 1;
+        let mut partition_paths = Vec::new();
+        for p in full_partition_paths {
+            let full_partition_path = p.to_str().unwrap();
+            
partition_paths.push(full_partition_path[common_prefix_len..].to_owned())
+        }
+        Ok(partition_paths)
+    }
+
+    pub fn get_file_groups(&self, partition_path: &str) -> 
Result<Vec<FileGroup>, Box<dyn Error>> {
+        let mut part_path = self.base_path.to_path_buf();
+        part_path.push(partition_path);
+        let mut fg_id_to_base_files: HashMap<String, Vec<BaseFile>> = 
HashMap::new();
+        for entry in fs::read_dir(part_path)? {
+            let p = entry?.path();
+            if p.is_file() && p.extension().and_then(|e| e.to_str()) == 
Some("parquet") {
+                let file_metadata = FileMetadata::from_path(p.as_path())?;
+                let base_file = BaseFile::from_file_metadata(file_metadata);
+                let fg_id = &base_file.file_group_id;
+                fg_id_to_base_files
+                    .entry(fg_id.to_owned())
+                    .or_insert_with(Vec::new)
+                    .push(base_file);
+            }
+        }
+        let mut file_groups: Vec<FileGroup> = Vec::new();
+        for (fg_id, base_files) in fg_id_to_base_files.into_iter() {
+            let mut fg = FileGroup::new(fg_id.to_owned(), 
Some(partition_path.to_owned()));
+            for bf in base_files {
+                fg.add_base_file(bf)?;
+            }
+            file_groups.push(fg);
+        }
+        Ok(file_groups)
+    }
+}
+
+#[test]
+fn meta_client_get_partition_paths() {
+    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+    let target_table_path = extract_test_table(fixture_path);
+    let meta_client = MetaClient::new(&target_table_path);
+    let partition_paths = meta_client.get_partition_paths().unwrap();
+    assert_eq!(
+        partition_paths,
+        vec!["chennai", "sao_paulo", "san_francisco"]
+    )
+}
+
+#[test]
+fn meta_client_get_file_groups() {
+    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+    let target_table_path = extract_test_table(fixture_path);
+    let meta_client = MetaClient::new(&target_table_path);
+    let file_groups = meta_client.get_file_groups("san_francisco").unwrap();
+    assert_eq!(file_groups.len(), 3);
+    let fg_ids: HashSet<&str> = HashSet::from_iter(file_groups.iter().map(|fg| 
fg.id.as_str()));
+    assert_eq!(
+        fg_ids,
+        HashSet::from_iter(vec![
+            "5a226868-2934-4f84-a16f-55124630c68d-0",
+            "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
+            "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
+        ])
+    );
+}
+#[test]
+fn meta_client_active_timeline_init_as_expected() {
+    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+    let target_table_path = extract_test_table(fixture_path);
+    let meta_client = MetaClient::new(&target_table_path);
+    assert_eq!(meta_client.timeline.instants.len(), 2)
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
new file mode 100644
index 0000000..e3d7bab
--- /dev/null
+++ b/crates/core/src/table/mod.rs
@@ -0,0 +1,59 @@
+use std::error::Error;
+use std::path::{Path, PathBuf};
+
+use arrow_schema::SchemaRef;
+
+use hudi_fs::test_utils::extract_test_table;
+
+use crate::table::file_system_view::FileSystemView;
+use crate::table::meta_client::MetaClient;
+
+mod file_system_view;
+mod meta_client;
+
+#[derive(Debug, Clone)]
+pub struct Table {
+    pub base_path: PathBuf,
+    meta_client: MetaClient,
+}
+
+impl Table {
+    pub fn new(base_path: &str) -> Self {
+        let p = PathBuf::from(base_path);
+        let meta_client = MetaClient::new(p.as_path());
+        Self {
+            base_path: p,
+            meta_client,
+        }
+    }
+
+    pub fn schema(&self) -> SchemaRef {
+        match self.meta_client.timeline.get_latest_schema() {
+            Ok(table_schema) => SchemaRef::from(table_schema),
+            Err(e) => {
+                panic!("Failed to resolve table schema: {}", e)
+            }
+        }
+    }
+
+    pub fn get_snapshot_file_paths(&self) -> Result<Vec<String>, Box<dyn 
Error>> {
+        let meta_client = MetaClient::new(&self.base_path);
+        let fs_view = FileSystemView::init(meta_client)?;
+        let mut file_paths = Vec::new();
+        for f in fs_view.get_latest_file_slices() {
+            if let Some(f) = f.file_path() {
+                file_paths.push(f.to_string());
+            }
+        }
+        Ok(file_paths)
+    }
+}
+
+#[test]
+fn load_snapshot_file_paths() {
+    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+    let target_table_path = extract_test_table(fixture_path);
+    let hudi_table = Table::new(target_table_path.as_path().to_str().unwrap());
+    assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
+    println!("{}", hudi_table.schema().to_string());
+}
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
new file mode 100644
index 0000000..86e9262
--- /dev/null
+++ b/crates/core/src/timeline/mod.rs
@@ -0,0 +1,146 @@
+use hudi_fs::file_name_without_ext;
+use std::collections::HashMap;
+
+use arrow_schema::SchemaRef;
+use hudi_fs::test_utils::extract_test_table;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+use parquet::file::serialized_reader::SerializedFileReader;
+use serde::de::Error;
+use serde_json::Value;
+use std::fs::File;
+use std::io::{ErrorKind, Read};
+use std::path::{Path, PathBuf};
+use std::{fs, io};
+
+use crate::error::HudiTimelineError;
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum State {
+    REQUESTED,
+    INFLIGHT,
+    COMPLETED,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub struct Instant {
+    state: State,
+    action: String,
+    timestamp: String,
+}
+
+impl Instant {
+    pub fn state_suffix(&self) -> String {
+        match self.state {
+            State::REQUESTED => ".requested".to_owned(),
+            State::INFLIGHT => ".inflight".to_owned(),
+            State::COMPLETED => "".to_owned(),
+        }
+    }
+
+    pub fn file_name(&self) -> String {
+        format!("{}.{}{}", self.timestamp, self.action, self.state_suffix())
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct Timeline {
+    pub base_path: PathBuf,
+    pub instants: Vec<Instant>,
+}
+
+impl Timeline {
+    pub fn init(base_path: &Path) -> Result<Self, io::Error> {
+        let instants = Self::load_completed_commit_instants(base_path)?;
+        Ok(Self {
+            base_path: base_path.to_path_buf(),
+            instants,
+        })
+    }
+
+    fn load_completed_commit_instants(base_path: &Path) -> 
Result<Vec<Instant>, io::Error> {
+        let mut completed_commits = Vec::new();
+        let mut timeline_path = base_path.to_path_buf();
+        timeline_path.push(".hoodie");
+        for entry in fs::read_dir(timeline_path)? {
+            let p = entry?.path();
+            if p.is_file() && p.extension().and_then(|e| e.to_str()) == 
Some("commit") {
+                completed_commits.push(Instant {
+                    state: State::COMPLETED,
+                    timestamp: file_name_without_ext(p.file_name()),
+                    action: "commit".to_owned(),
+                })
+            }
+        }
+        // TODO: encapsulate sorting within Instant
+        completed_commits.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
+        Ok(completed_commits)
+    }
+
+    pub fn get_latest_commit_metadata(&self) -> Result<HashMap<String, Value>, 
io::Error> {
+        match self.instants.iter().next_back() {
+            Some(instant) => {
+                let mut latest_instant_file_path = 
self.base_path.to_path_buf();
+                latest_instant_file_path.push(".hoodie");
+                latest_instant_file_path.push(instant.file_name());
+                let mut f = File::open(latest_instant_file_path)?;
+                let mut content = String::new();
+                f.read_to_string(&mut content)?;
+                let commit_metadata = serde_json::from_str(&content)?;
+                Ok(commit_metadata)
+            }
+            None => return Ok(HashMap::new()),
+        }
+    }
+
+    pub fn get_latest_schema(&self) -> Result<SchemaRef, io::Error> {
+        let commit_metadata = self.get_latest_commit_metadata()?;
+        if let Some(partitionToWriteStats) = 
commit_metadata["partitionToWriteStats"].as_object() {
+            for (_key, value) in partitionToWriteStats {
+                if let Some(first_value) = value.as_array().and_then(|arr| 
arr.get(0)) {
+                    if let Some(path) = first_value["path"].as_str() {
+                        let mut base_file_path = 
PathBuf::from(&self.base_path);
+                        base_file_path.push(path);
+                        let file = File::open(base_file_path)?;
+                        let builder = 
ParquetRecordBatchReaderBuilder::try_new(file)?;
+                        return Ok(builder.schema().to_owned());
+                    }
+                }
+                break;
+            }
+        }
+        Err(io::Error::new(
+            ErrorKind::InvalidData,
+            "Failed to resolve schema.",
+        ))
+    }
+}
+
+#[test]
+fn init_commits_timeline() {
+    let fixture_path = Path::new("fixtures/timeline/commits_stub");
+    let timeline = Timeline::init(fixture_path).unwrap();
+    assert_eq!(
+        timeline.instants,
+        vec![
+            Instant {
+                state: State::COMPLETED,
+                action: "commit".to_owned(),
+                timestamp: "20240402123035233".to_owned(),
+            },
+            Instant {
+                state: State::COMPLETED,
+                action: "commit".to_owned(),
+                timestamp: "20240402144910683".to_owned(),
+            },
+        ]
+    )
+}
+
+#[test]
+fn read_latest_schema() {
+    let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
+    let target_table_path = extract_test_table(fixture_path);
+    let timeline = Timeline::init(target_table_path.as_path()).unwrap();
+    let table_schema = timeline.get_latest_schema().unwrap();
+    assert_eq!(table_schema.fields.len(), 11)
+}
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
new file mode 100644
index 0000000..ac15c78
--- /dev/null
+++ b/crates/datafusion/Cargo.toml
@@ -0,0 +1,56 @@
+[package]
+name = "hudi-datafusion"
+version = "0.1.0"
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+hudi-core = { path = "../core"}
+hudi-fs = { path = "../fs" }
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true , features = ["chrono-tz"]}
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true, features = ["serde"] }
+arrow-select = { workspace = true }
+parquet = { workspace = true, features = [
+    "async",
+    "object_store",
+] }
+pin-project-lite = "^0.2.7"
+
+# datafusion
+datafusion = { workspace = true }
+datafusion-expr = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-proto = { workspace = true }
+datafusion-sql = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+
+# serde
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
+
+# async
+tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros"] }
+
+# "stdlib"
+bytes = { workspace = true }
+chrono = { workspace = true, default-features = false, features = ["clock"] }
+hashbrown = "0.14.3"
+regex = { workspace = true }
+thiserror = { workspace = true }
+uuid = { workspace = true, features = ["serde", "v4"] }
+url = { workspace = true }
+
+# test
+tempfile = "3.10.1"
+zip-extract = "0.1.3"
+async-trait = "0.1.79"
diff --git a/crates/datafusion/src/bin/main.rs 
b/crates/datafusion/src/bin/main.rs
new file mode 100644
index 0000000..5ca51ed
--- /dev/null
+++ b/crates/datafusion/src/bin/main.rs
@@ -0,0 +1,16 @@
+use std::sync::Arc;
+
+use datafusion::error::Result;
+use datafusion::prelude::{DataFrame, SessionContext};
+
+use hudi_datafusion::HudiDataSource;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let ctx = SessionContext::new();
+    let hudi = HudiDataSource::new("/tmp/trips_table");
+    ctx.register_table("trips_table", Arc::new(hudi))?;
+    let df: DataFrame = ctx.sql("SELECT * from trips_table where fare > 
20.0").await?;
+    df.show().await?;
+    Ok(())
+}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
new file mode 100644
index 0000000..4b6e981
--- /dev/null
+++ b/crates/datafusion/src/lib.rs
@@ -0,0 +1,154 @@
+use arrow_array::RecordBatch;
+use std::any::Any;
+use std::fmt::{Debug, Formatter};
+use std::fs::File;
+use std::path::Path;
+use std::sync::{Arc, Mutex};
+
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion;
+use datafusion::datasource::TableProvider;
+use datafusion::execution::context::SessionState;
+use datafusion::execution::{SendableRecordBatchStream, TaskContext};
+use datafusion::physical_plan::memory::MemoryStream;
+use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+use datafusion::prelude::{DataFrame, SessionContext};
+use datafusion_common;
+use datafusion_common::{project_schema, DataFusionError};
+use datafusion_expr;
+use datafusion_expr::{Expr, TableType};
+use datafusion_physical_expr;
+use datafusion_physical_expr::PhysicalSortExpr;
+use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+
+use hudi_core::HudiTable;
+use hudi_fs::test_utils::extract_test_table;
+
+#[derive(Debug, Clone)]
+pub struct HudiDataSource {
+    table: HudiTable,
+}
+
+impl HudiDataSource {
+    pub fn new(base_path: &str) -> Self {
+        let table = HudiTable::new(base_path);
+        Self { table }
+    }
+    pub(crate) async fn create_physical_plan(
+        &self,
+        projections: Option<&Vec<usize>>,
+        schema: SchemaRef,
+    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(HudiExec::new(projections, schema, self.clone())))
+    }
+
+    fn get_record_batches(&self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
+        match self.table.get_snapshot_file_paths() {
+            Ok(file_paths) => {
+                let mut record_batches = Vec::new();
+                for f in file_paths {
+                    let file = File::open(f)?;
+                    let builder = 
ParquetRecordBatchReaderBuilder::try_new(file)?;
+                    let mut reader = builder.build()?;
+                    if let Ok(Some(result)) = reader.next().transpose() {
+                        record_batches.push(result)
+                    }
+                }
+                Ok(record_batches)
+            }
+            Err(e) => Err(DataFusionError::Execution(
+                "Failed to read records from table.".to_owned(),
+            )),
+        }
+    }
+}
+
+#[async_trait]
+impl TableProvider for HudiDataSource {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.table.schema()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        state: &SessionState,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        return self.create_physical_plan(projection, self.schema()).await;
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct HudiExec {
+    data_source: HudiDataSource,
+    projected_schema: SchemaRef,
+}
+
+impl HudiExec {
+    fn new(
+        projections: Option<&Vec<usize>>,
+        schema: SchemaRef,
+        data_source: HudiDataSource,
+    ) -> Self {
+        let projected_schema = project_schema(&schema, projections).unwrap();
+        Self {
+            data_source,
+            projected_schema,
+        }
+    }
+}
+
+impl DisplayAs for HudiExec {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> 
std::fmt::Result {
+        write!(f, "HudiExec")
+    }
+}
+
+impl ExecutionPlan for HudiExec {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.projected_schema.clone()
+    }
+
+    fn output_partitioning(&self) -> datafusion::physical_plan::Partitioning {
+        datafusion::physical_plan::Partitioning::UnknownPartitioning(1)
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> datafusion_common::Result<SendableRecordBatchStream> {
+        let data = self.data_source.get_record_batches()?;
+        Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
+    }
+}
diff --git a/crates/fs/Cargo.toml b/crates/fs/Cargo.toml
new file mode 100644
index 0000000..7c4121b
--- /dev/null
+++ b/crates/fs/Cargo.toml
@@ -0,0 +1,50 @@
+[package]
+name = "hudi-fs"
+version = "0.1.0"
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+# arrow
+arrow = { workspace = true }
+arrow-arith = { workspace = true }
+arrow-array = { workspace = true , features = ["chrono-tz"]}
+arrow-buffer = { workspace = true }
+arrow-cast = { workspace = true }
+arrow-ipc = { workspace = true }
+arrow-json = { workspace = true }
+arrow-ord = { workspace = true }
+arrow-row = { workspace = true }
+arrow-schema = { workspace = true, features = ["serde"] }
+arrow-select = { workspace = true }
+parquet = { workspace = true, features = [
+    "async",
+    "object_store",
+] }
+pin-project-lite = "^0.2.7"
+
+# datafusion
+datafusion = { workspace = true, optional = true }
+datafusion-expr = { workspace = true, optional = true }
+datafusion-common = { workspace = true, optional = true }
+datafusion-proto = { workspace = true, optional = true }
+datafusion-sql = { workspace = true, optional = true }
+datafusion-physical-expr = { workspace = true, optional = true }
+
+# serde
+serde = { workspace = true, features = ["derive"] }
+serde_json = { workspace = true }
+
+# "stdlib"
+bytes = { workspace = true }
+chrono = { workspace = true, default-features = false, features = ["clock"] }
+hashbrown = "0.14.3"
+regex = { workspace = true }
+thiserror = { workspace = true }
+uuid = { workspace = true, features = ["serde", "v4"] }
+url = { workspace = true }
+
+# test
+tempfile = "3.10.1"
+zip-extract = "0.1.3"
diff --git a/crates/fs/fixtures/a.parquet b/crates/fs/fixtures/a.parquet
new file mode 100644
index 0000000..d99765e
Binary files /dev/null and b/crates/fs/fixtures/a.parquet differ
diff --git a/crates/fs/src/file_systems.rs b/crates/fs/src/file_systems.rs
new file mode 100644
index 0000000..fa5a528
--- /dev/null
+++ b/crates/fs/src/file_systems.rs
@@ -0,0 +1,38 @@
+use std::error::Error;
+use std::{fs::File, path::Path};
+
+use parquet::file::reader::{FileReader, Length, SerializedFileReader};
+
+use crate::assert_approx_eq;
+
+#[derive(Debug)]
+pub struct FileMetadata {
+    pub path: String,
+    pub name: String,
+    pub size: u64,
+    pub num_records: i64,
+}
+
+impl FileMetadata {
+    pub fn from_path(p: &Path) -> Result<Self, Box<dyn Error>> {
+        let file = File::open(p)?;
+        let reader = SerializedFileReader::new(file).unwrap();
+        let num_records = reader.metadata().file_metadata().num_rows();
+        Ok(Self {
+            path: p.to_str().unwrap().to_string(),
+            name: p.file_name().unwrap().to_os_string().into_string().unwrap(),
+            size: p.metadata().unwrap().len(),
+            num_records,
+        })
+    }
+}
+
+#[test]
+fn read_file_metadata() {
+    let fixture_path = Path::new("fixtures/a.parquet");
+    let fm = FileMetadata::from_path(fixture_path).unwrap();
+    assert_eq!(fm.path, "fixtures/a.parquet");
+    assert_eq!(fm.name, "a.parquet");
+    assert_approx_eq!(fm.size, 866, 20);
+    assert_eq!(fm.num_records, 5);
+}
diff --git a/crates/fs/src/lib.rs b/crates/fs/src/lib.rs
new file mode 100644
index 0000000..c299855
--- /dev/null
+++ b/crates/fs/src/lib.rs
@@ -0,0 +1,30 @@
+use std::ffi::OsStr;
+
+pub mod file_systems;
+pub mod test_utils;
+
+#[macro_export]
+macro_rules! assert_approx_eq {
+    ($a:expr, $b:expr, $delta:expr) => {{
+        let a = $a;
+        let b = $b;
+        let delta = $delta;
+        let diff = if a > b { a - b } else { b - a };
+
+        assert!(
+            diff <= delta,
+            "assertion failed: `(left ≈ right)`\n  left: `{:?}`,\n right: 
`{:?}`,\n delta: `{:?}`",
+            a, b, delta
+        );
+    }};
+}
+
+pub fn file_name_without_ext(file_name: Option<&OsStr>) -> String {
+    return file_name
+        .and_then(|e| e.to_str())
+        .unwrap()
+        .rsplit_once('.')
+        .unwrap()
+        .0
+        .to_owned();
+}
diff --git a/crates/fs/src/test_utils.rs b/crates/fs/src/test_utils.rs
new file mode 100644
index 0000000..8d43d67
--- /dev/null
+++ b/crates/fs/src/test_utils.rs
@@ -0,0 +1,12 @@
+use std::fs;
+use std::io::Cursor;
+use std::path::{Path, PathBuf};
+
+use tempfile::tempdir;
+
+pub fn extract_test_table(fixture_path: &Path) -> PathBuf {
+    let target_dir = tempdir().unwrap().path().to_path_buf();
+    let archive = fs::read(fixture_path).unwrap();
+    zip_extract::extract(Cursor::new(archive), &target_dir, true).unwrap();
+    target_dir
+}
diff --git a/crates/hudi/Cargo.toml b/crates/hudi/Cargo.toml
new file mode 100644
index 0000000..cef563c
--- /dev/null
+++ b/crates/hudi/Cargo.toml
@@ -0,0 +1,11 @@
+[package]
+name = "hudi"
+version.workspace = true
+edition.workspace = true
+license.workspace = true
+rust-version.workspace = true
+
+# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
+
+[dependencies]
+hudi-core = { path = "../core"}
diff --git a/crates/hudi/src/lib.rs b/crates/hudi/src/lib.rs
new file mode 100644
index 0000000..5b71c7e
--- /dev/null
+++ b/crates/hudi/src/lib.rs
@@ -0,0 +1 @@
+pub use hudi_core::*;


Reply via email to