This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d71bd7  feat: implement `HudiTable` as python API (#23)
0d71bd7 is described below

commit 0d71bd7a0c3ec8d2aae631f5425b97166de8a76d
Author: Shiyan Xu <[email protected]>
AuthorDate: Wed Jun 19 15:39:40 2024 -0500

    feat: implement `HudiTable` as python API (#23)
---
 Cargo.toml                                         |   2 +-
 .../.hoodie/hoodie.properties                      |  44 ++---
 crates/core/src/error.rs                           |   2 +
 crates/core/src/file_group/mod.rs                  |   9 +-
 crates/core/src/lib.rs                             |   7 +-
 crates/core/src/table/config.rs                    | 147 ++++++++++++++
 crates/core/src/table/file_system_view.rs          | 104 ----------
 .../core/src/table/{meta_client.rs => fs_view.rs}  | 112 ++++++-----
 crates/core/src/table/metadata.rs                  |  55 ++++++
 crates/core/src/table/mod.rs                       | 216 ++++++++++++++++++---
 crates/core/src/timeline/mod.rs                    |   6 +-
 python/src/lib.rs => crates/core/src/utils.rs      |  28 ++-
 crates/datafusion/src/lib.rs                       |   7 +-
 python/Cargo.toml                                  |   6 +-
 python/Makefile                                    |   2 +-
 python/hudi/__init__.py                            |   4 +
 python/hudi/_internal.pyi                          |  10 +
 python/hudi/{_internal.pyi => table.py}            |  16 +-
 python/src/lib.rs                                  |  32 +++
 19 files changed, 572 insertions(+), 237 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index e24d132..29294d8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -41,7 +41,7 @@ arrow-ord = { version = "50" }
 arrow-row = { version = "50" }
 arrow-schema = { version = "50" }
 arrow-select = { version = "50" }
-object_store = { version = "0.9" }
+object_store = { version = "0.10.1" }
 parquet = { version = "50" }
 
 # datafusion
diff --git a/python/Cargo.toml 
b/crates/core/fixtures/table_metadata/sample_table_properties/.hoodie/hoodie.properties
similarity index 51%
copy from python/Cargo.toml
copy to 
crates/core/fixtures/table_metadata/sample_table_properties/.hoodie/hoodie.properties
index 0a6c666..9c0c7e2 100644
--- a/python/Cargo.toml
+++ 
b/crates/core/fixtures/table_metadata/sample_table_properties/.hoodie/hoodie.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,27 +15,24 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
 
-[package]
-name = "hudi-python"
-version = "0.1.0"
-edition = "2021"
-
-# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
-[lib]
-name = "hudi"
-crate-type = ["cdylib"]
-doc = false
-
-[dependencies]
-# runtime
-futures = { workspace = true }
-num_cpus = { workspace = true }
-tokio = { workspace = true, features = ["rt-multi-thread"] }
-
-[dependencies.pyo3]
-version = "0.21.2"
-features = ["extension-module", "abi3", "abi3-py38", "gil-refs"]
-
-[dependencies.hudi]
-path = "../crates/hudi"
+hoodie.table.type=COPY_ON_WRITE
+hoodie.table.metadata.partitions=files
+hoodie.table.precombine.field=ts
+hoodie.table.partition.fields=city
+hoodie.archivelog.folder=archived
+hoodie.table.cdc.enabled=false
+hoodie.timeline.layout.version=1
+hoodie.table.checksum=3761586722
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.partition.metafile.use.base.format=false
+hoodie.datasource.write.hive_style_partitioning=false
+hoodie.table.metadata.partitions.inflight=
+hoodie.populate.meta.fields=true
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
+hoodie.table.base.file.format=PARQUET
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.version=6
diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs
index f7f0bd7..4a2f0f2 100644
--- a/crates/core/src/error.rs
+++ b/crates/core/src/error.rs
@@ -40,4 +40,6 @@ pub enum HudiCoreError {
     FailToLoadFileGroup(#[from] HudiFileGroupError),
     #[error("Failed to build file system view")]
     FailToBuildFileSystemView(#[from] HudiFileSystemViewError),
+    #[error("Failed to load table properties")]
+    LoadTablePropertiesError,
 }
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index 609b6e6..29761bd 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -59,9 +59,8 @@ pub struct FileSlice {
     pub partition_path: Option<String>,
 }
 
-#[allow(dead_code)]
 impl FileSlice {
-    pub fn file_path(&self) -> Option<&str> {
+    pub fn base_file_path(&self) -> Option<&str> {
         match &self.base_file.metadata {
             None => None,
             Some(file_metadata) => Some(file_metadata.path.as_str()),
@@ -72,16 +71,12 @@ impl FileSlice {
         &self.base_file.file_group_id
     }
 
-    pub fn base_instant_time(&self) -> &str {
-        &self.base_file.commit_time
-    }
-
     pub fn set_base_file(&mut self, base_file: BaseFile) {
         self.base_file = base_file
     }
 }
 
-#[derive(Debug)]
+#[derive(Clone, Debug)]
 pub struct FileGroup {
     pub id: String,
     pub partition_path: Option<String>,
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index abf9a66..86a191f 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -24,12 +24,7 @@ mod file_group;
 pub mod table;
 pub type HudiTable = Table;
 mod timeline;
-
-pub const BASE_FILE_EXTENSIONS: [&str; 1] = ["parquet"];
-
-pub fn is_base_file_format_supported(ext: &str) -> bool {
-    BASE_FILE_EXTENSIONS.contains(&ext)
-}
+mod utils;
 
 pub fn crate_version() -> &'static str {
     env!("CARGO_PKG_VERSION")
diff --git a/crates/core/src/table/config.rs b/crates/core/src/table/config.rs
new file mode 100644
index 0000000..05a75db
--- /dev/null
+++ b/crates/core/src/table/config.rs
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+use crate::error::HudiCoreError;
+use std::str::FromStr;
+
+pub enum ConfigKey {
+    BaseFileFormat,
+    Checksum,
+    DatabaseName,
+    DropsPartitionFields,
+    IsHiveStylePartitioning,
+    IsPartitionPathUrlencoded,
+    KeyGeneratorClass,
+    PartitionFields,
+    PrecombineField,
+    PopulatesMetaFields,
+    RecordKeyFields,
+    TableName,
+    TableType,
+    TableVersion,
+    TimelineLayoutVersion,
+}
+
+impl AsRef<str> for ConfigKey {
+    fn as_ref(&self) -> &str {
+        match self {
+            Self::BaseFileFormat => "hoodie.table.base.file.format",
+            Self::Checksum => "hoodie.table.checksum",
+            Self::DatabaseName => "hoodie.database.name",
+            Self::DropsPartitionFields => 
"hoodie.datasource.write.drop.partition.columns",
+            Self::IsHiveStylePartitioning => 
"hoodie.datasource.write.hive_style_partitioning",
+            Self::IsPartitionPathUrlencoded => 
"hoodie.datasource.write.partitionpath.urlencode",
+            Self::KeyGeneratorClass => "hoodie.table.keygenerator.class",
+            Self::PartitionFields => "hoodie.table.partition.fields",
+            Self::PrecombineField => "hoodie.table.precombine.field",
+            Self::PopulatesMetaFields => "hoodie.populate.meta.fields",
+            Self::RecordKeyFields => "hoodie.table.recordkey.fields",
+            Self::TableName => "hoodie.table.name",
+            Self::TableType => "hoodie.table.type",
+            Self::TableVersion => "hoodie.table.version",
+            Self::TimelineLayoutVersion => "hoodie.timeline.layout.version",
+        }
+    }
+}
+
+#[derive(Debug, PartialEq)]
+pub enum TableType {
+    CopyOnWrite,
+    MergeOnRead,
+}
+
+impl FromStr for TableType {
+    type Err = HudiCoreError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_ascii_lowercase().as_str() {
+            "copy_on_write" => Ok(Self::CopyOnWrite),
+            "copy-on-write" => Ok(Self::CopyOnWrite),
+            "cow" => Ok(Self::CopyOnWrite),
+            "merge_on_read" => Ok(Self::MergeOnRead),
+            "merge-on-read" => Ok(Self::MergeOnRead),
+            "mor" => Ok(Self::MergeOnRead),
+            _ => Err(HudiCoreError::LoadTablePropertiesError),
+        }
+    }
+}
+
+#[derive(Debug, PartialEq)]
+pub enum BaseFileFormat {
+    Parquet,
+}
+
+impl FromStr for BaseFileFormat {
+    type Err = HudiCoreError;
+
+    fn from_str(s: &str) -> Result<Self, Self::Err> {
+        match s.to_ascii_lowercase().as_str() {
+            "parquet" => Ok(Self::Parquet),
+            _ => Err(HudiCoreError::LoadTablePropertiesError),
+        }
+    }
+}
+#[cfg(test)]
+mod tests {
+    use crate::table::config::{BaseFileFormat, TableType};
+    use std::str::FromStr;
+
+    #[test]
+    fn create_table_type() {
+        assert_eq!(TableType::from_str("cow").unwrap(), 
TableType::CopyOnWrite);
+        assert_eq!(
+            TableType::from_str("copy_on_write").unwrap(),
+            TableType::CopyOnWrite
+        );
+        assert_eq!(
+            TableType::from_str("COPY-ON-WRITE").unwrap(),
+            TableType::CopyOnWrite
+        );
+        assert_eq!(TableType::from_str("MOR").unwrap(), 
TableType::MergeOnRead);
+        assert_eq!(
+            TableType::from_str("Merge_on_read").unwrap(),
+            TableType::MergeOnRead
+        );
+        assert_eq!(
+            TableType::from_str("Merge-on-read").unwrap(),
+            TableType::MergeOnRead
+        );
+        assert!(TableType::from_str("").is_err());
+        assert!(TableType::from_str("copyonwrite").is_err());
+        assert!(TableType::from_str("MERGEONREAD").is_err());
+        assert!(TableType::from_str("foo").is_err());
+    }
+
+    #[test]
+    fn create_base_file_format() {
+        assert_eq!(
+            BaseFileFormat::from_str("parquet").unwrap(),
+            BaseFileFormat::Parquet
+        );
+        assert_eq!(
+            BaseFileFormat::from_str("PArquet").unwrap(),
+            BaseFileFormat::Parquet
+        );
+        assert!(TableType::from_str("").is_err());
+        assert!(
+            TableType::from_str("orc").is_err(),
+            "orc is not yet supported."
+        );
+    }
+}
diff --git a/crates/core/src/table/file_system_view.rs 
b/crates/core/src/table/file_system_view.rs
deleted file mode 100644
index 7714a8c..0000000
--- a/crates/core/src/table/file_system_view.rs
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use crate::error::HudiFileSystemViewError;
-use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
-use crate::file_group::{FileGroup, FileSlice};
-use crate::table::meta_client::MetaClient;
-use hashbrown::HashMap;
-
-pub struct FileSystemView {
-    meta_client: MetaClient,
-    partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
-}
-
-impl FileSystemView {
-    pub fn init(meta_client: MetaClient) -> Result<Self, 
HudiFileSystemViewError> {
-        let mut fs_view = FileSystemView {
-            meta_client,
-            partition_to_file_groups: HashMap::new(),
-        };
-        fs_view.load_partitions()?;
-        Ok(fs_view)
-    }
-
-    fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
-        match self.meta_client.get_partition_paths() {
-            Ok(partition_paths) => {
-                for p in partition_paths {
-                    match self.meta_client.get_file_groups(p.as_str()) {
-                        Ok(file_groups) => {
-                            self.partition_to_file_groups.insert(p, 
file_groups);
-                        }
-                        Err(e) => return Err(FailToLoadPartitions(e)),
-                    }
-                }
-            }
-            Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
-        }
-        Ok(())
-    }
-
-    pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
-        let mut file_slices = Vec::new();
-        for fgs in self.partition_to_file_groups.values() {
-            for fg in fgs {
-                if let Some(file_slice) = fg.get_latest_file_slice() {
-                    file_slices.push(file_slice)
-                }
-            }
-        }
-        file_slices
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use crate::table::file_system_view::FileSystemView;
-    use crate::table::meta_client::MetaClient;
-    use hudi_fs::test_utils::extract_test_table;
-    use std::collections::HashSet;
-    use std::path::Path;
-
-    #[test]
-    fn meta_client_get_file_groups() {
-        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-        let target_table_path = extract_test_table(fixture_path);
-        let meta_client = MetaClient::new(&target_table_path);
-        let fs_view = FileSystemView::init(meta_client).unwrap();
-        let file_slices = fs_view.get_latest_file_slices();
-        assert_eq!(file_slices.len(), 5);
-        let mut fg_ids = Vec::new();
-        for f in file_slices {
-            let fp = f.file_group_id();
-            fg_ids.push(fp);
-        }
-        let actual: HashSet<&str> = fg_ids.into_iter().collect();
-        assert_eq!(
-            actual,
-            HashSet::from_iter(vec![
-                "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
-                "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
-                "ee915c68-d7f8-44f6-9759-e691add290d8-0",
-                "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
-                "5a226868-2934-4f84-a16f-55124630c68d-0"
-            ])
-        );
-    }
-}
diff --git a/crates/core/src/table/meta_client.rs 
b/crates/core/src/table/fs_view.rs
similarity index 61%
rename from crates/core/src/table/meta_client.rs
rename to crates/core/src/table/fs_view.rs
index 15f8495..f672dfc 100644
--- a/crates/core/src/table/meta_client.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -25,42 +25,42 @@ use hashbrown::HashMap;
 
 use hudi_fs::file_systems::FileMetadata;
 
-use crate::file_group::{BaseFile, FileGroup};
-use crate::timeline::Timeline;
+use crate::error::HudiFileSystemViewError;
+use crate::error::HudiFileSystemViewError::FailToLoadPartitions;
+use crate::file_group::{BaseFile, FileGroup, FileSlice};
+use crate::utils::get_leaf_dirs;
 
-#[derive(Debug, Clone)]
-pub struct MetaClient {
+#[derive(Clone, Debug)]
+pub struct FileSystemView {
     pub base_path: PathBuf,
-    pub timeline: Timeline,
+    partition_to_file_groups: HashMap<String, Vec<FileGroup>>,
 }
 
-impl MetaClient {
-    pub fn new(base_path: &Path) -> Self {
-        match Timeline::init(base_path) {
-            Ok(timeline) => Self {
-                base_path: base_path.to_path_buf(),
-                timeline,
-            },
-            Err(e) => panic!("Failed to instantiate meta client: {}", e),
-        }
+impl FileSystemView {
+    pub fn new(base_path: &Path) -> Result<Self, HudiFileSystemViewError> {
+        let mut fs_view = FileSystemView {
+            base_path: base_path.to_path_buf(),
+            partition_to_file_groups: HashMap::new(),
+        };
+        fs_view.load_partitions()?;
+        Ok(fs_view)
     }
 
-    fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
-        let mut leaf_dirs = Vec::new();
-        let mut is_leaf_dir = true;
-        for entry in fs::read_dir(path)? {
-            let entry = entry?;
-            if entry.path().is_dir() {
-                is_leaf_dir = false;
-                let curr_sub_dir = entry.path();
-                let curr = Self::get_leaf_dirs(&curr_sub_dir)?;
-                leaf_dirs.extend(curr);
+    fn load_partitions(&mut self) -> Result<(), HudiFileSystemViewError> {
+        match self.get_partition_paths() {
+            Ok(partition_paths) => {
+                for p in partition_paths {
+                    match self.get_file_groups(p.as_str()) {
+                        Ok(file_groups) => {
+                            self.partition_to_file_groups.insert(p, 
file_groups);
+                        }
+                        Err(e) => return Err(FailToLoadPartitions(e)),
+                    }
+                }
             }
+            Err(e) => return Err(FailToLoadPartitions(Box::new(e))),
         }
-        if is_leaf_dir {
-            leaf_dirs.push(path.to_path_buf())
-        }
-        Ok(leaf_dirs)
+        Ok(())
     }
 
     pub fn get_partition_paths(&self) -> Result<Vec<String>, io::Error> {
@@ -73,7 +73,7 @@ impl MetaClient {
         }
         let mut full_partition_paths: Vec<PathBuf> = Vec::new();
         for p in first_level_partition_paths {
-            full_partition_paths.extend(Self::get_leaf_dirs(p.as_path())?)
+            full_partition_paths.extend(get_leaf_dirs(p.as_path())?)
         }
         let common_prefix_len = self.base_path.to_str().unwrap().len() + 1;
         let mut partition_paths = Vec::new();
@@ -110,21 +110,35 @@ impl MetaClient {
         }
         Ok(file_groups)
     }
+
+    pub fn get_latest_file_slices(&self) -> Vec<&FileSlice> {
+        let mut file_slices = Vec::new();
+        for fgs in self.partition_to_file_groups.values() {
+            for fg in fgs {
+                if let Some(file_slice) = fg.get_latest_file_slice() {
+                    file_slices.push(file_slice)
+                }
+            }
+        }
+        file_slices
+    }
 }
 
 #[cfg(test)]
 mod tests {
-    use crate::table::meta_client::MetaClient;
-    use hudi_fs::test_utils::extract_test_table;
     use std::collections::HashSet;
     use std::path::Path;
 
+    use hudi_fs::test_utils::extract_test_table;
+
+    use crate::table::fs_view::FileSystemView;
+
     #[test]
-    fn meta_client_get_partition_paths() {
+    fn get_partition_paths() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let meta_client = MetaClient::new(&target_table_path);
-        let partition_paths = meta_client.get_partition_paths().unwrap();
+        let fs_view = FileSystemView::new(&target_table_path).unwrap();
+        let partition_paths = fs_view.get_partition_paths().unwrap();
         let partition_path_set: HashSet<&str> =
             HashSet::from_iter(partition_paths.iter().map(|p| p.as_str()));
         assert_eq!(
@@ -134,27 +148,27 @@ mod tests {
     }
 
     #[test]
-    fn meta_client_get_file_groups() {
+    fn get_latest_file_slices() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let meta_client = MetaClient::new(&target_table_path);
-        let file_groups = 
meta_client.get_file_groups("san_francisco").unwrap();
-        assert_eq!(file_groups.len(), 3);
-        let fg_ids: HashSet<&str> = 
HashSet::from_iter(file_groups.iter().map(|fg| fg.id.as_str()));
+        let fs_view = FileSystemView::new(&target_table_path).unwrap();
+        let file_slices = fs_view.get_latest_file_slices();
+        assert_eq!(file_slices.len(), 5);
+        let mut fg_ids = Vec::new();
+        for f in file_slices {
+            let fp = f.file_group_id();
+            fg_ids.push(fp);
+        }
+        let actual: HashSet<&str> = fg_ids.into_iter().collect();
         assert_eq!(
-            fg_ids,
+            actual,
             HashSet::from_iter(vec![
-                "5a226868-2934-4f84-a16f-55124630c68d-0",
                 "780b8586-3ad0-48ef-a6a1-d2217845ce4a-0",
-                "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0"
+                "d9082ffd-2eb1-4394-aefc-deb4a61ecc57-0",
+                "ee915c68-d7f8-44f6-9759-e691add290d8-0",
+                "68d3c349-f621-4cd8-9e8b-c6dd8eb20d08-0",
+                "5a226868-2934-4f84-a16f-55124630c68d-0"
             ])
         );
     }
-    #[test]
-    fn meta_client_active_timeline_init_as_expected() {
-        let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
-        let target_table_path = extract_test_table(fixture_path);
-        let meta_client = MetaClient::new(&target_table_path);
-        assert_eq!(meta_client.timeline.instants.len(), 2)
-    }
 }
diff --git a/crates/core/src/table/metadata.rs 
b/crates/core/src/table/metadata.rs
new file mode 100644
index 0000000..fb2d41b
--- /dev/null
+++ b/crates/core/src/table/metadata.rs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+use crate::table::config::{BaseFileFormat, TableType};
+
+pub trait ProvidesTableMetadata {
+    fn base_file_format(&self) -> BaseFileFormat;
+
+    fn checksum(&self) -> i64;
+
+    fn database_name(&self) -> String;
+
+    fn drops_partition_fields(&self) -> bool;
+
+    fn is_hive_style_partitioning(&self) -> bool;
+
+    fn is_partition_path_urlencoded(&self) -> bool;
+
+    fn is_partitioned(&self) -> bool;
+
+    fn key_generator_class(&self) -> String;
+
+    fn location(&self) -> String;
+
+    fn partition_fields(&self) -> Vec<String>;
+
+    fn precombine_field(&self) -> String;
+
+    fn populates_meta_fields(&self) -> bool;
+
+    fn record_key_fields(&self) -> Vec<String>;
+
+    fn table_name(&self) -> String;
+
+    fn table_type(&self) -> TableType;
+
+    fn table_version(&self) -> u32;
+
+    fn timeline_layout_version(&self) -> u32;
+}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index d1c7dc7..2548554 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -17,48 +17,101 @@
  * under the License.
  */
 
+use std::collections::HashMap;
 use std::error::Error;
-use std::path::PathBuf;
+use std::fs::File;
+use std::io::{BufRead, BufReader};
+use std::path::{Path, PathBuf};
+use std::str::FromStr;
 
 use arrow_schema::SchemaRef;
 
-use crate::table::file_system_view::FileSystemView;
-use crate::table::meta_client::MetaClient;
+use crate::file_group::FileSlice;
+use crate::table::config::BaseFileFormat;
+use crate::table::config::{ConfigKey, TableType};
+use crate::table::fs_view::FileSystemView;
+use crate::table::metadata::ProvidesTableMetadata;
+use crate::timeline::Timeline;
 
-mod file_system_view;
-mod meta_client;
+mod config;
+mod fs_view;
+mod metadata;
 
 #[derive(Debug, Clone)]
 pub struct Table {
     pub base_path: PathBuf,
-    meta_client: MetaClient,
+    pub props: HashMap<String, String>,
 }
 
 impl Table {
-    pub fn new(base_path: &str) -> Self {
-        let p = PathBuf::from(base_path);
-        let meta_client = MetaClient::new(p.as_path());
-        Self {
-            base_path: p,
-            meta_client,
+    pub fn new(table_base_path: &str) -> Self {
+        let base_path = PathBuf::from(table_base_path);
+        let props_path = base_path.join(".hoodie").join("hoodie.properties");
+        match Self::load_properties(props_path.as_path()) {
+            Ok(props) => Self { base_path, props },
+            Err(e) => {
+                panic!("Failed to load table properties: {}", e)
+            }
+        }
+    }
+
+    fn load_properties(path: &Path) -> Result<HashMap<String, String>, 
std::io::Error> {
+        let file = File::open(path)?;
+        let reader = BufReader::new(file);
+        let lines = reader.lines();
+        let mut properties: HashMap<String, String> = HashMap::new();
+        for line in lines {
+            let line = line?;
+            let trimmed_line = line.trim();
+            if trimmed_line.is_empty() || trimmed_line.starts_with('#') {
+                continue;
+            }
+            let mut parts = trimmed_line.splitn(2, '=');
+            let key = parts.next().unwrap().to_owned();
+            let value = parts.next().unwrap_or("").to_owned();
+            properties.insert(key, value);
         }
+        Ok(properties)
+    }
+
+    pub fn get_property(&self, key: &str) -> &str {
+        match self.props.get(key) {
+            Some(value) => value,
+            None => panic!("Failed to retrieve property {}", key),
+        }
+    }
+
+    pub fn get_timeline(&self) -> Result<Timeline, std::io::Error> {
+        Timeline::new(self.base_path.as_path())
     }
 
     pub fn schema(&self) -> SchemaRef {
-        match self.meta_client.timeline.get_latest_schema() {
-            Ok(table_schema) => SchemaRef::from(table_schema),
+        match Timeline::new(self.base_path.as_path()) {
+            Ok(timeline) => match timeline.get_latest_schema() {
+                Ok(schema) => SchemaRef::from(schema),
+                Err(e) => {
+                    panic!("Failed to resolve table schema: {}", e)
+                }
+            },
             Err(e) => {
                 panic!("Failed to resolve table schema: {}", e)
             }
         }
     }
 
-    pub fn get_snapshot_file_paths(&self) -> Result<Vec<String>, Box<dyn 
Error>> {
-        let meta_client = MetaClient::new(&self.base_path);
-        let fs_view = FileSystemView::init(meta_client)?;
-        let mut file_paths = Vec::new();
+    pub fn get_latest_file_slices(&self) -> Result<Vec<FileSlice>, Box<dyn 
Error>> {
+        let mut file_slices = Vec::new();
+        let fs_view = FileSystemView::new(self.base_path.as_path())?;
         for f in fs_view.get_latest_file_slices() {
-            if let Some(f) = f.file_path() {
+            file_slices.push(f.clone());
+        }
+        Ok(file_slices)
+    }
+
+    pub fn get_latest_file_paths(&self) -> Result<Vec<String>, Box<dyn Error>> 
{
+        let mut file_paths = Vec::new();
+        for f in self.get_latest_file_slices()? {
+            if let Some(f) = f.base_file_path() {
                 file_paths.push(f.to_string());
             }
         }
@@ -66,18 +119,135 @@ impl Table {
     }
 }
 
+impl ProvidesTableMetadata for Table {
+    fn base_file_format(&self) -> BaseFileFormat {
+        
BaseFileFormat::from_str(self.get_property(ConfigKey::BaseFileFormat.as_ref())).unwrap()
+    }
+
+    fn checksum(&self) -> i64 {
+        i64::from_str(self.get_property(ConfigKey::Checksum.as_ref())).unwrap()
+    }
+
+    fn database_name(&self) -> String {
+        match self.props.get(ConfigKey::DatabaseName.as_ref()) {
+            Some(value) => value.to_string(),
+            None => "default".to_string(),
+        }
+    }
+
+    fn drops_partition_fields(&self) -> bool {
+        
bool::from_str(self.get_property(ConfigKey::DropsPartitionFields.as_ref())).unwrap()
+    }
+
+    fn is_hive_style_partitioning(&self) -> bool {
+        
bool::from_str(self.get_property(ConfigKey::IsHiveStylePartitioning.as_ref())).unwrap()
+    }
+
+    fn is_partition_path_urlencoded(&self) -> bool {
+        
bool::from_str(self.get_property(ConfigKey::IsPartitionPathUrlencoded.as_ref())).unwrap()
+    }
+
+    fn is_partitioned(&self) -> bool {
+        !self
+            .key_generator_class()
+            .ends_with("NonpartitionedKeyGenerator")
+    }
+
+    fn key_generator_class(&self) -> String {
+        self.get_property(ConfigKey::KeyGeneratorClass.as_ref())
+            .to_string()
+    }
+
+    fn location(&self) -> String {
+        self.base_path.to_str().unwrap().to_string()
+    }
+
+    fn partition_fields(&self) -> Vec<String> {
+        self.get_property(ConfigKey::PartitionFields.as_ref())
+            .split(',')
+            .map(str::to_string)
+            .collect()
+    }
+
+    fn precombine_field(&self) -> String {
+        self.get_property(ConfigKey::PrecombineField.as_ref())
+            .to_string()
+    }
+
+    fn populates_meta_fields(&self) -> bool {
+        
bool::from_str(self.get_property(ConfigKey::PopulatesMetaFields.as_ref())).unwrap()
+    }
+
+    fn record_key_fields(&self) -> Vec<String> {
+        self.get_property(ConfigKey::RecordKeyFields.as_ref())
+            .split(',')
+            .map(str::to_string)
+            .collect()
+    }
+
+    fn table_name(&self) -> String {
+        self.get_property(ConfigKey::TableName.as_ref()).to_string()
+    }
+
+    fn table_type(&self) -> TableType {
+        
TableType::from_str(self.get_property(ConfigKey::TableType.as_ref())).unwrap()
+    }
+
+    fn table_version(&self) -> u32 {
+        
u32::from_str(self.get_property(ConfigKey::TableVersion.as_ref())).unwrap()
+    }
+
+    fn timeline_layout_version(&self) -> u32 {
+        
u32::from_str(self.get_property(ConfigKey::TimelineLayoutVersion.as_ref())).unwrap()
+    }
+}
+
 #[cfg(test)]
 mod tests {
+    use std::path::Path;
+
+    use crate::table::config::BaseFileFormat::Parquet;
+    use crate::table::config::TableType::CopyOnWrite;
+    use crate::table::metadata::ProvidesTableMetadata;
     use crate::table::Table;
     use hudi_fs::test_utils::extract_test_table;
-    use std::path::Path;
 
     #[test]
-    fn load_snapshot_file_paths() {
+    fn hudi_table_get_latest_file_paths() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let hudi_table = 
Table::new(target_table_path.as_path().to_str().unwrap());
-        assert_eq!(hudi_table.get_snapshot_file_paths().unwrap().len(), 5);
+        let hudi_table = Table::new(target_table_path.to_str().unwrap());
+        assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2);
+        assert_eq!(hudi_table.get_latest_file_paths().unwrap().len(), 5);
         println!("{}", hudi_table.schema());
     }
+
+    #[test]
+    fn hudi_table_get_table_metadata() {
+        let fixture_path = 
Path::new("fixtures/table_metadata/sample_table_properties");
+        let table = Table::new(fixture_path.to_str().unwrap());
+        assert_eq!(table.base_file_format(), Parquet);
+        assert_eq!(table.checksum(), 3761586722);
+        assert_eq!(table.database_name(), "default");
+        assert!(!table.drops_partition_fields());
+        assert!(!table.is_hive_style_partitioning());
+        assert!(!table.is_partition_path_urlencoded());
+        assert!(table.is_partitioned());
+        assert_eq!(
+            table.key_generator_class(),
+            "org.apache.hudi.keygen.SimpleKeyGenerator"
+        );
+        assert_eq!(
+            table.location(),
+            "fixtures/table_metadata/sample_table_properties"
+        );
+        assert_eq!(table.partition_fields(), vec!["city"]);
+        assert_eq!(table.precombine_field(), "ts");
+        assert!(table.populates_meta_fields());
+        assert_eq!(table.record_key_fields(), vec!["uuid"]);
+        assert_eq!(table.table_name(), "trips");
+        assert_eq!(table.table_type(), CopyOnWrite);
+        assert_eq!(table.table_version(), 6);
+        assert_eq!(table.timeline_layout_version(), 1);
+    }
 }
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 75d89d5..1cd9e13 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -64,7 +64,7 @@ pub struct Timeline {
 }
 
 impl Timeline {
-    pub fn init(base_path: &Path) -> Result<Self, io::Error> {
+    pub fn new(base_path: &Path) -> Result<Self, io::Error> {
         let instants = Self::load_completed_commit_instants(base_path)?;
         Ok(Self {
             base_path: base_path.to_path_buf(),
@@ -140,7 +140,7 @@ mod tests {
     fn read_latest_schema() {
         let fixture_path = Path::new("fixtures/table/0.x_cow_partitioned.zip");
         let target_table_path = extract_test_table(fixture_path);
-        let timeline = Timeline::init(target_table_path.as_path()).unwrap();
+        let timeline = Timeline::new(target_table_path.as_path()).unwrap();
         let table_schema = timeline.get_latest_schema().unwrap();
         assert_eq!(table_schema.fields.len(), 11)
     }
@@ -148,7 +148,7 @@ mod tests {
     #[test]
     fn init_commits_timeline() {
         let fixture_path = Path::new("fixtures/timeline/commits_stub");
-        let timeline = Timeline::init(fixture_path).unwrap();
+        let timeline = Timeline::new(fixture_path).unwrap();
         assert_eq!(
             timeline.instants,
             vec![
diff --git a/python/src/lib.rs b/crates/core/src/utils.rs
similarity index 59%
copy from python/src/lib.rs
copy to crates/core/src/utils.rs
index ad03f49..c4189c5 100644
--- a/python/src/lib.rs
+++ b/crates/core/src/utils.rs
@@ -17,15 +17,23 @@
  * under the License.
  */
 
-use pyo3::prelude::*;
+use std::path::{Path, PathBuf};
+use std::{fs, io};
 
-#[pyfunction]
-fn rust_core_version() -> &'static str {
-    hudi::crate_version()
-}
-
-#[pymodule]
-fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
-    m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
-    Ok(())
+pub fn get_leaf_dirs(path: &Path) -> Result<Vec<PathBuf>, io::Error> {
+    let mut leaf_dirs = Vec::new();
+    let mut is_leaf_dir = true;
+    for entry in fs::read_dir(path)? {
+        let entry = entry?;
+        if entry.path().is_dir() {
+            is_leaf_dir = false;
+            let curr_sub_dir = entry.path();
+            let curr = get_leaf_dirs(&curr_sub_dir)?;
+            leaf_dirs.extend(curr);
+        }
+    }
+    if is_leaf_dir {
+        leaf_dirs.push(path.to_path_buf())
+    }
+    Ok(leaf_dirs)
 }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index ff603de..56aae9d 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -44,8 +44,9 @@ pub struct HudiDataSource {
 
 impl HudiDataSource {
     pub fn new(base_path: &str) -> Self {
-        let table = HudiTable::new(base_path);
-        Self { table }
+        Self {
+            table: HudiTable::new(base_path),
+        }
     }
     pub(crate) async fn create_physical_plan(
         &self,
@@ -56,7 +57,7 @@ impl HudiDataSource {
     }
 
     fn get_record_batches(&self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
-        match self.table.get_snapshot_file_paths() {
+        match self.table.get_latest_file_paths() {
             Ok(file_paths) => {
                 let mut record_batches = Vec::new();
                 for f in file_paths {
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 0a6c666..6c5702b 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -20,17 +20,13 @@ name = "hudi-python"
 version = "0.1.0"
 edition = "2021"
 
-# See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 [lib]
 name = "hudi"
 crate-type = ["cdylib"]
 doc = false
 
 [dependencies]
-# runtime
-futures = { workspace = true }
-num_cpus = { workspace = true }
-tokio = { workspace = true, features = ["rt-multi-thread"] }
+object_store = { workspace = true }
 
 [dependencies.pyo3]
 version = "0.21.2"
diff --git a/python/Makefile b/python/Makefile
index c73bacc..8017d2f 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -17,7 +17,7 @@
 
 .DEFAULT_GOAL := help
 
-VENV := .venv
+VENV := venv
 MATURIN_VERSION := $(shell grep 'requires =' pyproject.toml | cut -d= -f2- | 
tr -d '[ "]')
 PACKAGE_VERSION := $(shell grep version Cargo.toml | head -n 1 | awk '{print 
$$3}' | tr -d '"' )
 
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index a67d5ea..fa70485 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -14,3 +14,7 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
+
+from ._internal import __version__ as __version__
+from ._internal import rust_core_version as rust_core_version
+from .table import HudiTable as HudiTable
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 75aaa86..8759c2e 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -15,6 +15,16 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
+from typing import List
+
 __version__: str
 
+
 def rust_core_version() -> str: ...
+
+
+class BindingHudiTable:
+
+    def __init__(self, table_uri: str): ...
+
+    def get_latest_file_paths(self) -> List[str]: ...
diff --git a/python/hudi/_internal.pyi b/python/hudi/table.py
similarity index 66%
copy from python/hudi/_internal.pyi
copy to python/hudi/table.py
index 75aaa86..a8cfbcb 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/table.py
@@ -15,6 +15,18 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-__version__: str
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Union, List
 
-def rust_core_version() -> str: ...
+from python.hudi._internal import BindingHudiTable
+
+
+@dataclass(init=False)
+class HudiTable:
+
+    def __init__(self, table_uri: Union[str, Path, ""]):
+        self._table = BindingHudiTable(str(table_uri))
+
+    def get_latest_file_paths(self) -> List[str]:
+        return self._table.get_latest_file_paths()
diff --git a/python/src/lib.rs b/python/src/lib.rs
index ad03f49..8ac33f4 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -19,6 +19,35 @@
 
 use pyo3::prelude::*;
 
+use hudi::table::Table;
+
+#[pyclass]
+struct BindingHudiTable {
+    _table: hudi::HudiTable,
+}
+
+#[pymethods]
+impl BindingHudiTable {
+    #[new]
+    #[pyo3(signature = (table_uri))]
+    fn new(py: Python, table_uri: &str) -> PyResult<Self> {
+        py.allow_threads(|| {
+            Ok(BindingHudiTable {
+                _table: Table::new(table_uri),
+            })
+        })
+    }
+
+    pub fn get_latest_file_paths(&self) -> PyResult<Vec<String>> {
+        match self._table.get_latest_file_paths() {
+            Ok(paths) => Ok(paths),
+            Err(_e) => {
+                panic!("Failed to retrieve the latest file paths.")
+            }
+        }
+    }
+}
+
 #[pyfunction]
 fn rust_core_version() -> &'static str {
     hudi::crate_version()
@@ -26,6 +55,9 @@ fn rust_core_version() -> &'static str {
 
 #[pymodule]
 fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
+    m.add("__version__", env!("CARGO_PKG_VERSION"))?;
     m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
+
+    m.add_class::<BindingHudiTable>()?;
     Ok(())
 }


Reply via email to