This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b6c2e5d  refactor: adjust table APIs to skip passing options (#56)
b6c2e5d is described below

commit b6c2e5d592b249504ff366079a163190bb4ec53d
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jul 8 01:00:58 2024 -0500

    refactor: adjust table APIs to skip passing options (#56)
    
    - support `new()` and `new_with_options()` api for creating table
    - use generics to make options argument more flexible
    - simplify python binding classes
---
 Cargo.toml                                         |   2 +-
 crates/core/src/lib.rs                             |  11 +-
 crates/core/src/storage/utils.rs                   |   4 +
 crates/core/src/table/mod.rs                       |  94 +++++++--------
 crates/datafusion/Cargo.toml                       |   1 +
 crates/datafusion/src/lib.rs                       |  50 +++++---
 .../table_props_valid/.hoodie/hoodie.properties}   |  48 ++++----
 python/Cargo.toml                                  |   2 +-
 python/hudi/__init__.py                            |   4 +-
 python/hudi/_internal.pyi                          |  15 +--
 python/hudi/table.py                               |  53 ---------
 python/src/{lib.rs => internal.rs}                 |  54 +++------
 python/src/lib.rs                                  | 131 +--------------------
 python/tests/test_table_read.py                    |   2 +-
 14 files changed, 139 insertions(+), 332 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 0086ca8..24412e8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -68,4 +68,4 @@ async-recursion = { version = "1.1.1" }
 async-trait = { version = "0.1" }
 dashmap = { version = "6.0.1" }
 futures = { version = "0.3" }
-tokio = { version = "1" }
+tokio = { version = "1", features = ["rt-multi-thread"] }
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 80d778f..9b492e9 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -17,14 +17,7 @@
  * under the License.
  */
 
-use crate::table::Table;
-
-pub mod file_group;
-pub mod table;
-pub type HudiTable = Table;
 pub mod config;
+pub mod file_group;
 pub mod storage;
-
-pub fn crate_version() -> &'static str {
-    env!("CARGO_PKG_VERSION")
-}
+pub mod table;
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index 053366e..a38f813 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -76,6 +76,10 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str]) 
-> Result<Url> {
     Ok(url)
 }
 
+pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
+    std::iter::empty::<(&str, &str)>()
+}
+
 #[cfg(test)]
 mod tests {
     use std::str::FromStr;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 8454a1a..c08105d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -39,7 +39,7 @@ use crate::config::read::HudiReadConfig::AsOfTimestamp;
 use crate::config::table::{HudiTableConfig, TableTypeValue};
 use crate::config::HudiConfigs;
 use crate::file_group::FileSlice;
-use crate::storage::utils::parse_uri;
+use crate::storage::utils::{empty_options, parse_uri};
 use crate::storage::Storage;
 use crate::table::fs_view::FileSystemView;
 use crate::table::timeline::Timeline;
@@ -57,10 +57,19 @@ pub struct Table {
 }
 
 impl Table {
-    pub async fn new(base_uri: &str, all_options: HashMap<String, String>) -> 
Result<Self> {
+    pub async fn new(base_uri: &str) -> Result<Self> {
+        Self::new_with_options(base_uri, empty_options()).await
+    }
+
+    pub async fn new_with_options<I, K, V>(base_uri: &str, all_options: I) -> 
Result<Self>
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
         let base_url = Arc::new(parse_uri(base_uri)?);
 
-        let (configs, extra_options) = Self::load_configs(base_url.clone(), 
&all_options)
+        let (configs, extra_options) = Self::load_configs(base_url.clone(), 
all_options)
             .await
             .context("Failed to load table properties")?;
         let configs = Arc::new(configs);
@@ -84,10 +93,15 @@ impl Table {
         })
     }
 
-    async fn load_configs(
+    async fn load_configs<I, K, V>(
         base_url: Arc<Url>,
-        all_options: &HashMap<String, String>,
-    ) -> Result<(HudiConfigs, HashMap<String, String>)> {
+        all_options: I,
+    ) -> Result<(HudiConfigs, HashMap<String, String>)>
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
         // TODO: load hudi global config
         let mut hudi_options = HashMap::new();
         let mut extra_options = HashMap::new();
@@ -95,10 +109,10 @@ impl Table {
         Self::imbue_cloud_env_vars(&mut extra_options);
 
         for (k, v) in all_options {
-            if k.starts_with("hoodie.") {
-                hudi_options.insert(k.clone(), v.clone());
+            if k.as_ref().starts_with("hoodie.") {
+                hudi_options.insert(k.as_ref().to_string(), v.into());
             } else {
-                extra_options.insert(k.clone(), v.clone());
+                extra_options.insert(k.as_ref().to_string(), v.into());
             }
         }
         let storage = Storage::new(base_url, &extra_options)?;
@@ -249,7 +263,7 @@ impl Table {
 
 #[cfg(test)]
 mod tests {
-    use std::collections::{HashMap, HashSet};
+    use std::collections::HashSet;
     use std::fs::canonicalize;
     use std::panic;
     use std::path::Path;
@@ -271,7 +285,7 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_get_schema() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let fields: Vec<String> = hudi_table
             .get_schema()
             .await
@@ -324,7 +338,7 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_read_file_slice() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let batches = hudi_table
             .read_file_slice_by_path(
                 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
@@ -338,7 +352,7 @@ mod tests {
     #[tokio::test]
     async fn hudi_table_get_file_paths() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert_eq!(hudi_table.timeline.instants.len(), 2);
         let actual: HashSet<String> =
             HashSet::from_iter(hudi_table.get_file_paths().await.unwrap());
@@ -356,7 +370,7 @@ mod tests {
     async fn hudi_table_get_file_slices_as_of_timestamps() {
         let base_url = TestTable::V6Nonpartitioned.url();
 
-        let hudi_table = Table::new(base_url.path(), 
HashMap::new()).await.unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
@@ -367,11 +381,10 @@ mod tests {
         );
 
         // as of the latest timestamp
-        let opts = HashMap::from_iter(vec![(
-            AsOfTimestamp.as_ref().to_string(),
-            "20240418173551906".to_string(),
-        )]);
-        let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+        let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
+        let hudi_table = Table::new_with_options(base_url.path(), opts)
+            .await
+            .unwrap();
         let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
@@ -382,11 +395,10 @@ mod tests {
         );
 
         // as of just smaller than the latest timestamp
-        let opts = HashMap::from_iter(vec![(
-            AsOfTimestamp.as_ref().to_string(),
-            "20240418173551905".to_string(),
-        )]);
-        let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+        let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
+        let hudi_table = Table::new_with_options(base_url.path(), opts)
+            .await
+            .unwrap();
         let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
@@ -397,8 +409,10 @@ mod tests {
         );
 
         // as of non-exist old timestamp
-        let opts = 
HashMap::from_iter(vec![(AsOfTimestamp.as_ref().to_string(), "0".to_string())]);
-        let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+        let opts = [(AsOfTimestamp.as_ref(), "0")];
+        let hudi_table = Table::new_with_options(base_url.path(), opts)
+            .await
+            .unwrap();
         let file_slices = hudi_table.get_file_slices().await.unwrap();
         assert_eq!(
             file_slices
@@ -414,12 +428,9 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
                 .unwrap();
-        let table = Table::new(
+        let table = Table::new_with_options(
             base_url.as_str(),
-            HashMap::from_iter(vec![(
-                "hoodie.internal.skip.config.validation".to_string(),
-                "true".to_string(),
-            )]),
+            [("hoodie.internal.skip.config.validation", "true")],
         )
         .await
         .unwrap();
@@ -473,12 +484,9 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
                 .unwrap();
-        let table = Table::new(
+        let table = Table::new_with_options(
             base_url.as_str(),
-            HashMap::from_iter(vec![(
-                "hoodie.internal.skip.config.validation".to_string(),
-                "true".to_string(),
-            )]),
+            [("hoodie.internal.skip.config.validation", "true")],
         )
         .await
         .unwrap();
@@ -505,12 +513,9 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
                 .unwrap();
-        let table = Table::new(
+        let table = Table::new_with_options(
             base_url.as_str(),
-            HashMap::from_iter(vec![(
-                "hoodie.internal.skip.config.validation".to_string(),
-                "true".to_string(),
-            )]),
+            [("hoodie.internal.skip.config.validation", "true")],
         )
         .await
         .unwrap();
@@ -543,12 +548,9 @@ mod tests {
         let base_url =
             
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
                 .unwrap();
-        let table = Table::new(
+        let table = Table::new_with_options(
             base_url.as_str(),
-            HashMap::from_iter(vec![(
-                "hoodie.internal.skip.config.validation".to_string(),
-                "true".to_string(),
-            )]),
+            [("hoodie.internal.skip.config.validation", "true")],
         )
         .await
         .unwrap();
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 3ea5e25..94e1ae7 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -38,6 +38,7 @@ datafusion-physical-expr = { workspace = true }
 # runtime / async
 async-trait = { workspace = true }
 tokio = { workspace = true }
+url = { workspace = true }
 
 [dev-dependencies]
 hudi-tests = { path = "../tests" }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index fe23015..5f46b23 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -18,7 +18,6 @@
  */
 
 use std::any::Any;
-use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
 use std::thread;
@@ -39,8 +38,8 @@ use datafusion_physical_expr::create_physical_expr;
 use DataFusionError::Execution;
 
 use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
-use hudi_core::HudiTable;
+use hudi_core::storage::utils::{empty_options, get_scheme_authority, 
parse_uri};
+use hudi_core::table::Table as HudiTable;
 
 #[derive(Clone, Debug)]
 pub struct HudiDataSource {
@@ -48,8 +47,17 @@ pub struct HudiDataSource {
 }
 
 impl HudiDataSource {
-    pub async fn new(base_uri: &str, options: HashMap<String, String>) -> 
Result<Self> {
-        match HudiTable::new(base_uri, options).await {
+    pub async fn new(base_uri: &str) -> Result<Self> {
+        Self::new_with_options(base_uri, empty_options()).await
+    }
+
+    pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
+        match HudiTable::new_with_options(base_uri, options).await {
             Ok(t) => Ok(Self { table: Arc::new(t) }),
             Err(e) => Err(Execution(format!("Failed to create Hudi table: {}", 
e))),
         }
@@ -129,11 +137,13 @@ impl TableProvider for HudiDataSource {
 
 #[cfg(test)]
 mod tests {
-    use std::collections::HashMap;
+    use std::fs::canonicalize;
+    use std::path::Path;
     use std::sync::Arc;
 
     use datafusion::prelude::{SessionConfig, SessionContext};
     use datafusion_common::ScalarValue;
+    use url::Url;
 
     use hudi_core::config::read::HudiReadConfig::InputPartitions;
     use hudi_tests::TestTable::{
@@ -146,9 +156,18 @@ mod tests {
 
     use crate::HudiDataSource;
 
+    #[tokio::test]
+    async fn get_default_input_partitions() {
+        let base_url =
+            
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
+                .unwrap();
+        let hudi = HudiDataSource::new(base_url.as_str()).await.unwrap();
+        assert_eq!(hudi.get_input_partitions(), 0)
+    }
+
     async fn prepare_session_context(
         test_table: &TestTable,
-        options: &[(String, String)],
+        options: Vec<(&str, &str)>,
     ) -> SessionContext {
         let config = SessionConfig::new().set(
             "datafusion.sql_parser.enable_ident_normalization",
@@ -156,8 +175,7 @@ mod tests {
         );
         let ctx = SessionContext::new_with_config(config);
         let base_url = test_table.url();
-        let options = 
HashMap::from_iter(options.iter().cloned().collect::<HashMap<_, _>>());
-        let hudi = HudiDataSource::new(base_url.as_str(), options)
+        let hudi = HudiDataSource::new_with_options(base_url.as_str(), options)
             .await
             .unwrap();
         ctx.register_table(test_table.as_ref(), Arc::new(hudi))
@@ -211,11 +229,8 @@ mod tests {
             (V6TimebasedkeygenNonhivestyle, 2),
         ] {
             println!(">>> testing for {}", test_table.as_ref());
-            let ctx = prepare_session_context(
-                test_table,
-                &[(InputPartitions.as_ref().to_string(), "2".to_string())],
-            )
-            .await;
+            let options = vec![(InputPartitions.as_ref(), "2")];
+            let ctx = prepare_session_context(test_table, options).await;
 
             let sql = format!(
                 r#"
@@ -249,11 +264,8 @@ mod tests {
             &[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
         {
             println!(">>> testing for {}", test_table.as_ref());
-            let ctx = prepare_session_context(
-                test_table,
-                &[(InputPartitions.as_ref().to_string(), "2".to_string())],
-            )
-            .await;
+            let ctx =
+                prepare_session_context(test_table, 
vec![(InputPartitions.as_ref(), "2")]).await;
 
             let sql = format!(
                 r#"
diff --git a/crates/datafusion/Cargo.toml 
b/crates/datafusion/tests/data/table_props_valid/.hoodie/hoodie.properties
similarity index 50%
copy from crates/datafusion/Cargo.toml
copy to crates/datafusion/tests/data/table_props_valid/.hoodie/hoodie.properties
index 3ea5e25..d1e5ac6 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/tests/data/table_props_valid/.hoodie/hoodie.properties
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,30 +15,25 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+#
 
-[package]
-name = "hudi-datafusion"
-version.workspace = true
-edition.workspace = true
-license.workspace = true
-rust-version.workspace = true
-
-[dependencies]
-hudi-core = { path = "../core" }
-# arrow
-arrow-schema = { workspace = true }
-
-# datafusion
-datafusion = { workspace = true }
-datafusion-expr = { workspace = true }
-datafusion-common = { workspace = true }
-datafusion-proto = { workspace = true }
-datafusion-sql = { workspace = true }
-datafusion-physical-expr = { workspace = true }
-
-# runtime / async
-async-trait = { workspace = true }
-tokio = { workspace = true }
-
-[dev-dependencies]
-hudi-tests = { path = "../tests" }
+hoodie.table.type=copy_on_write
+hoodie.table.metadata.partitions=files
+hoodie.table.precombine.field=ts
+hoodie.table.partition.fields=city
+hoodie.archivelog.folder=archived
+hoodie.table.cdc.enabled=false
+hoodie.timeline.layout.version=1
+hoodie.table.checksum=3761586722
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.database.name=db
+hoodie.partition.metafile.use.base.format=false
+hoodie.datasource.write.hive_style_partitioning=false
+hoodie.table.metadata.partitions.inflight=
+hoodie.populate.meta.fields=true
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
+hoodie.table.base.file.format=PARQUET
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.version=6
diff --git a/python/Cargo.toml b/python/Cargo.toml
index d3bcf70..77f16fa 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ anyhow = { workspace = true }
 
 # runtime / async
 futures = { workspace = true }
-tokio = { workspace = true, features = ["rt-multi-thread"] }
+tokio = { workspace = true }
 
 [dependencies.pyo3]
 version = "0.21.2"
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index 8054368..1dee57b 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -15,7 +15,5 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from ._internal import __version__ as __version__
-from ._internal import rust_core_version as rust_core_version
 from ._internal import HudiFileSlice as HudiFileSlice
-from .table import HudiTable as HudiTable
+from ._internal import HudiTable as HudiTable
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 3485316..421a80c 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -14,17 +14,13 @@
 #  KIND, either express or implied.  See the License for the
 #  specific language governing permissions and limitations
 #  under the License.
-
-from typing import List, Dict, Optional
+from dataclasses import dataclass
+from typing import Optional, Dict, List
 
 import pyarrow
 
-__version__: str
-
-
-def rust_core_version() -> str: ...
-
 
+@dataclass(init=False)
 class HudiFileSlice:
     file_group_id: str
     partition_path: str
@@ -36,12 +32,13 @@ class HudiFileSlice:
     def base_file_relative_path(self) -> str: ...
 
 
-class BindingHudiTable:
+@dataclass(init=False)
+class HudiTable:
 
     def __init__(
             self,
             table_uri: str,
-            storage_options: Optional[Dict[str, str]] = None,
+            options: Optional[Dict[str, str]] = None,
     ): ...
 
     def get_schema(self) -> "pyarrow.Schema": ...
diff --git a/python/hudi/table.py b/python/hudi/table.py
deleted file mode 100644
index 107b665..0000000
--- a/python/hudi/table.py
+++ /dev/null
@@ -1,53 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing,
-#  software distributed under the License is distributed on an
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-#  KIND, either express or implied.  See the License for the
-#  specific language governing permissions and limitations
-#  under the License.
-
-import os
-from dataclasses import dataclass
-from pathlib import Path
-from typing import Union, List, Optional, Dict
-
-import pyarrow
-from hudi._internal import BindingHudiTable, HudiFileSlice
-
-
-@dataclass(init=False)
-class HudiTable:
-
-    def __init__(
-            self,
-            table_uri: Union[str, Path, "os.PathLike[str]"],
-            storage_options: Optional[Dict[str, str]] = None,
-    ):
-        self._table = BindingHudiTable(str(table_uri), storage_options)
-
-    def get_schema(self) -> "pyarrow.Schema":
-        return self._table.get_schema()
-
-    def split_file_slices(self, n: int) -> List[List[HudiFileSlice]]:
-        return self._table.split_file_slices(n)
-
-    def get_file_slices(self) -> List[HudiFileSlice]:
-        return self._table.get_file_slices()
-
-    def read_file_slice(self, base_file_relative_path: str) -> 
"pyarrow.RecordBatch":
-        return self._table.read_file_slice(base_file_relative_path)
-
-    def read_snapshot(self) -> List["pyarrow.RecordBatch"]:
-        return self._table.read_snapshot()
-
-    def read_snapshot_as_of(self, timestamp: str) -> 
List["pyarrow.RecordBatch"]:
-        return self._table.read_snapshot_as_of(timestamp)
diff --git a/python/src/lib.rs b/python/src/internal.rs
similarity index 71%
copy from python/src/lib.rs
copy to python/src/internal.rs
index 745b457..141a74e 100644
--- a/python/src/lib.rs
+++ b/python/src/internal.rs
@@ -16,23 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 use std::collections::HashMap;
 use std::path::PathBuf;
 use std::sync::OnceLock;
 
 use anyhow::Context;
 use arrow::pyarrow::ToPyArrow;
-use pyo3::prelude::*;
+use pyo3::{pyclass, pymethods, PyErr, PyObject, PyResult, Python};
 use tokio::runtime::Runtime;
 
 use hudi::file_group::FileSlice;
-use hudi::HudiTable;
+use hudi::table::Table;
 
 #[cfg(not(tarpaulin))]
 #[derive(Clone, Debug)]
 #[pyclass]
-struct HudiFileSlice {
+pub struct HudiFileSlice {
     #[pyo3(get)]
     file_group_id: String,
     #[pyo3(get)]
@@ -50,7 +49,7 @@ struct HudiFileSlice {
 #[cfg(not(tarpaulin))]
 #[pymethods]
 impl HudiFileSlice {
-    pub fn base_file_relative_path(&self) -> PyResult<String> {
+    fn base_file_relative_path(&self) -> PyResult<String> {
         PathBuf::from(&self.partition_path)
             .join(&self.base_file_name)
             .to_str()
@@ -83,28 +82,28 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
 
 #[cfg(not(tarpaulin))]
 #[pyclass]
-struct BindingHudiTable {
-    _table: HudiTable,
+pub struct HudiTable {
+    _table: Table,
 }
 
 #[cfg(not(tarpaulin))]
 #[pymethods]
-impl BindingHudiTable {
+impl HudiTable {
     #[new]
-    #[pyo3(signature = (table_uri, storage_options = None))]
-    fn new(table_uri: &str, storage_options: Option<HashMap<String, String>>) 
-> PyResult<Self> {
-        let _table = rt().block_on(HudiTable::new(
+    #[pyo3(signature = (table_uri, options = None))]
+    fn new(table_uri: &str, options: Option<HashMap<String, String>>) -> 
PyResult<Self> {
+        let _table = rt().block_on(Table::new_with_options(
             table_uri,
-            storage_options.unwrap_or_default(),
+            options.unwrap_or_default(),
         ))?;
-        Ok(BindingHudiTable { _table })
+        Ok(HudiTable { _table })
     }
 
-    pub fn get_schema(&self, py: Python) -> PyResult<PyObject> {
+    fn get_schema(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self._table.get_schema())?.to_pyarrow(py)
     }
 
-    pub fn split_file_slices(&self, n: usize, py: Python) -> 
PyResult<Vec<Vec<HudiFileSlice>>> {
+    fn split_file_slices(&self, n: usize, py: Python) -> 
PyResult<Vec<Vec<HudiFileSlice>>> {
         py.allow_threads(|| {
             let file_slices = rt().block_on(self._table.split_file_slices(n))?;
             Ok(file_slices
@@ -114,42 +113,25 @@ impl BindingHudiTable {
         })
     }
 
-    pub fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
+    fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
         py.allow_threads(|| {
             let file_slices = rt().block_on(self._table.get_file_slices())?;
             Ok(file_slices.iter().map(convert_file_slice).collect())
         })
     }
 
-    pub fn read_file_slice(&self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
+    fn read_file_slice(&self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
         rt().block_on(self._table.read_file_slice_by_path(relative_path))?
             .to_pyarrow(py)
     }
 
-    pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
+    fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
         rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
     }
 }
 
 #[cfg(not(tarpaulin))]
-#[pyfunction]
-fn rust_core_version() -> &'static str {
-    hudi::crate_version()
-}
-
-#[cfg(not(tarpaulin))]
-#[pymodule]
-fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
-    m.add("__version__", env!("CARGO_PKG_VERSION"))?;
-    m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
-
-    m.add_class::<HudiFileSlice>()?;
-    m.add_class::<BindingHudiTable>()?;
-    Ok(())
-}
-
-#[cfg(not(tarpaulin))]
-pub fn rt() -> &'static Runtime {
+fn rt() -> &'static Runtime {
     static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
     TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio 
runtime."))
 }
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 745b457..99b7ef9 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -16,140 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
-use std::collections::HashMap;
-use std::path::PathBuf;
-use std::sync::OnceLock;
-
-use anyhow::Context;
-use arrow::pyarrow::ToPyArrow;
 use pyo3::prelude::*;
-use tokio::runtime::Runtime;
-
-use hudi::file_group::FileSlice;
-use hudi::HudiTable;
-
-#[cfg(not(tarpaulin))]
-#[derive(Clone, Debug)]
-#[pyclass]
-struct HudiFileSlice {
-    #[pyo3(get)]
-    file_group_id: String,
-    #[pyo3(get)]
-    partition_path: String,
-    #[pyo3(get)]
-    commit_time: String,
-    #[pyo3(get)]
-    base_file_name: String,
-    #[pyo3(get)]
-    base_file_size: usize,
-    #[pyo3(get)]
-    num_records: i64,
-}
 
-#[cfg(not(tarpaulin))]
-#[pymethods]
-impl HudiFileSlice {
-    pub fn base_file_relative_path(&self) -> PyResult<String> {
-        PathBuf::from(&self.partition_path)
-            .join(&self.base_file_name)
-            .to_str()
-            .map(String::from)
-            .context(format!(
-                "Failed to get base file relative path for file slice: {:?}",
-                self
-            ))
-            .map_err(PyErr::from)
-    }
-}
-
-#[cfg(not(tarpaulin))]
-fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
-    let file_group_id = f.file_group_id().to_string();
-    let partition_path = 
f.partition_path.as_deref().unwrap_or_default().to_string();
-    let commit_time = f.base_file.commit_time.to_string();
-    let base_file_name = f.base_file.info.name.clone();
-    let base_file_size = f.base_file.info.size;
-    let num_records = 
f.base_file.stats.clone().unwrap_or_default().num_records;
-    HudiFileSlice {
-        file_group_id,
-        partition_path,
-        commit_time,
-        base_file_name,
-        base_file_size,
-        num_records,
-    }
-}
-
-#[cfg(not(tarpaulin))]
-#[pyclass]
-struct BindingHudiTable {
-    _table: HudiTable,
-}
-
-#[cfg(not(tarpaulin))]
-#[pymethods]
-impl BindingHudiTable {
-    #[new]
-    #[pyo3(signature = (table_uri, storage_options = None))]
-    fn new(table_uri: &str, storage_options: Option<HashMap<String, String>>) 
-> PyResult<Self> {
-        let _table = rt().block_on(HudiTable::new(
-            table_uri,
-            storage_options.unwrap_or_default(),
-        ))?;
-        Ok(BindingHudiTable { _table })
-    }
-
-    pub fn get_schema(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self._table.get_schema())?.to_pyarrow(py)
-    }
-
-    pub fn split_file_slices(&self, n: usize, py: Python) -> 
PyResult<Vec<Vec<HudiFileSlice>>> {
-        py.allow_threads(|| {
-            let file_slices = rt().block_on(self._table.split_file_slices(n))?;
-            Ok(file_slices
-                .iter()
-                .map(|inner_vec| 
inner_vec.iter().map(convert_file_slice).collect())
-                .collect())
-        })
-    }
-
-    pub fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
-        py.allow_threads(|| {
-            let file_slices = rt().block_on(self._table.get_file_slices())?;
-            Ok(file_slices.iter().map(convert_file_slice).collect())
-        })
-    }
-
-    pub fn read_file_slice(&self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
-        rt().block_on(self._table.read_file_slice_by_path(relative_path))?
-            .to_pyarrow(py)
-    }
-
-    pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
-        rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
-    }
-}
-
-#[cfg(not(tarpaulin))]
-#[pyfunction]
-fn rust_core_version() -> &'static str {
-    hudi::crate_version()
-}
+mod internal;
 
 #[cfg(not(tarpaulin))]
 #[pymodule]
 fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
-    m.add("__version__", env!("CARGO_PKG_VERSION"))?;
-    m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
-
+    use internal::{HudiFileSlice, HudiTable};
     m.add_class::<HudiFileSlice>()?;
-    m.add_class::<BindingHudiTable>()?;
+    m.add_class::<HudiTable>()?;
     Ok(())
 }
-
-#[cfg(not(tarpaulin))]
-pub fn rt() -> &'static Runtime {
-    static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
-    TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio 
runtime."))
-}
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 61195fb..e56463c 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -26,7 +26,7 @@ pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0, 
reason="hudi only supported if
 
 def test_sample_table(get_sample_table):
     table_path = get_sample_table
-    table = HudiTable(table_path, {})
+    table = HudiTable(table_path)
 
     assert table.get_schema().names == ['_hoodie_commit_time', 
'_hoodie_commit_seqno', '_hoodie_record_key',
                                         '_hoodie_partition_path', 
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',

Reply via email to