This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b6c2e5d refactor: adjust table APIs to skip passing options (#56)
b6c2e5d is described below
commit b6c2e5d592b249504ff366079a163190bb4ec53d
Author: Shiyan Xu <[email protected]>
AuthorDate: Mon Jul 8 01:00:58 2024 -0500
refactor: adjust table APIs to skip passing options (#56)
- support `new()` and `new_with_options()` api for creating table
- use generics to make options argument more flexible
- simplify python binding classes
---
Cargo.toml | 2 +-
crates/core/src/lib.rs | 11 +-
crates/core/src/storage/utils.rs | 4 +
crates/core/src/table/mod.rs | 94 +++++++--------
crates/datafusion/Cargo.toml | 1 +
crates/datafusion/src/lib.rs | 50 +++++---
.../table_props_valid/.hoodie/hoodie.properties} | 48 ++++----
python/Cargo.toml | 2 +-
python/hudi/__init__.py | 4 +-
python/hudi/_internal.pyi | 15 +--
python/hudi/table.py | 53 ---------
python/src/{lib.rs => internal.rs} | 54 +++------
python/src/lib.rs | 131 +--------------------
python/tests/test_table_read.py | 2 +-
14 files changed, 139 insertions(+), 332 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 0086ca8..24412e8 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -68,4 +68,4 @@ async-recursion = { version = "1.1.1" }
async-trait = { version = "0.1" }
dashmap = { version = "6.0.1" }
futures = { version = "0.3" }
-tokio = { version = "1" }
+tokio = { version = "1", features = ["rt-multi-thread"] }
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 80d778f..9b492e9 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -17,14 +17,7 @@
* under the License.
*/
-use crate::table::Table;
-
-pub mod file_group;
-pub mod table;
-pub type HudiTable = Table;
pub mod config;
+pub mod file_group;
pub mod storage;
-
-pub fn crate_version() -> &'static str {
- env!("CARGO_PKG_VERSION")
-}
+pub mod table;
diff --git a/crates/core/src/storage/utils.rs b/crates/core/src/storage/utils.rs
index 053366e..a38f813 100644
--- a/crates/core/src/storage/utils.rs
+++ b/crates/core/src/storage/utils.rs
@@ -76,6 +76,10 @@ pub fn join_url_segments(base_url: &Url, segments: &[&str])
-> Result<Url> {
Ok(url)
}
+pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a str)> {
+ std::iter::empty::<(&str, &str)>()
+}
+
#[cfg(test)]
mod tests {
use std::str::FromStr;
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index 8454a1a..c08105d 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -39,7 +39,7 @@ use crate::config::read::HudiReadConfig::AsOfTimestamp;
use crate::config::table::{HudiTableConfig, TableTypeValue};
use crate::config::HudiConfigs;
use crate::file_group::FileSlice;
-use crate::storage::utils::parse_uri;
+use crate::storage::utils::{empty_options, parse_uri};
use crate::storage::Storage;
use crate::table::fs_view::FileSystemView;
use crate::table::timeline::Timeline;
@@ -57,10 +57,19 @@ pub struct Table {
}
impl Table {
- pub async fn new(base_uri: &str, all_options: HashMap<String, String>) ->
Result<Self> {
+ pub async fn new(base_uri: &str) -> Result<Self> {
+ Self::new_with_options(base_uri, empty_options()).await
+ }
+
+ pub async fn new_with_options<I, K, V>(base_uri: &str, all_options: I) ->
Result<Self>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
let base_url = Arc::new(parse_uri(base_uri)?);
- let (configs, extra_options) = Self::load_configs(base_url.clone(),
&all_options)
+ let (configs, extra_options) = Self::load_configs(base_url.clone(),
all_options)
.await
.context("Failed to load table properties")?;
let configs = Arc::new(configs);
@@ -84,10 +93,15 @@ impl Table {
})
}
- async fn load_configs(
+ async fn load_configs<I, K, V>(
base_url: Arc<Url>,
- all_options: &HashMap<String, String>,
- ) -> Result<(HudiConfigs, HashMap<String, String>)> {
+ all_options: I,
+ ) -> Result<(HudiConfigs, HashMap<String, String>)>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
// TODO: load hudi global config
let mut hudi_options = HashMap::new();
let mut extra_options = HashMap::new();
@@ -95,10 +109,10 @@ impl Table {
Self::imbue_cloud_env_vars(&mut extra_options);
for (k, v) in all_options {
- if k.starts_with("hoodie.") {
- hudi_options.insert(k.clone(), v.clone());
+ if k.as_ref().starts_with("hoodie.") {
+ hudi_options.insert(k.as_ref().to_string(), v.into());
} else {
- extra_options.insert(k.clone(), v.clone());
+ extra_options.insert(k.as_ref().to_string(), v.into());
}
}
let storage = Storage::new(base_url, &extra_options)?;
@@ -249,7 +263,7 @@ impl Table {
#[cfg(test)]
mod tests {
- use std::collections::{HashMap, HashSet};
+ use std::collections::HashSet;
use std::fs::canonicalize;
use std::panic;
use std::path::Path;
@@ -271,7 +285,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
- let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let fields: Vec<String> = hudi_table
.get_schema()
.await
@@ -324,7 +338,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_read_file_slice() {
let base_url = TestTable::V6Nonpartitioned.url();
- let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let batches = hudi_table
.read_file_slice_by_path(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
@@ -338,7 +352,7 @@ mod tests {
#[tokio::test]
async fn hudi_table_get_file_paths() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.instants.len(), 2);
let actual: HashSet<String> =
HashSet::from_iter(hudi_table.get_file_paths().await.unwrap());
@@ -356,7 +370,7 @@ mod tests {
async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = TestTable::V6Nonpartitioned.url();
- let hudi_table = Table::new(base_url.path(),
HashMap::new()).await.unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
@@ -367,11 +381,10 @@ mod tests {
);
// as of the latest timestamp
- let opts = HashMap::from_iter(vec![(
- AsOfTimestamp.as_ref().to_string(),
- "20240418173551906".to_string(),
- )]);
- let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+ let opts = [(AsOfTimestamp.as_ref(), "20240418173551906")];
+ let hudi_table = Table::new_with_options(base_url.path(), opts)
+ .await
+ .unwrap();
let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
@@ -382,11 +395,10 @@ mod tests {
);
// as of just smaller than the latest timestamp
- let opts = HashMap::from_iter(vec![(
- AsOfTimestamp.as_ref().to_string(),
- "20240418173551905".to_string(),
- )]);
- let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+ let opts = [(AsOfTimestamp.as_ref(), "20240418173551905")];
+ let hudi_table = Table::new_with_options(base_url.path(), opts)
+ .await
+ .unwrap();
let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
@@ -397,8 +409,10 @@ mod tests {
);
// as of non-exist old timestamp
- let opts =
HashMap::from_iter(vec![(AsOfTimestamp.as_ref().to_string(), "0".to_string())]);
- let hudi_table = Table::new(base_url.path(), opts).await.unwrap();
+ let opts = [(AsOfTimestamp.as_ref(), "0")];
+ let hudi_table = Table::new_with_options(base_url.path(), opts)
+ .await
+ .unwrap();
let file_slices = hudi_table.get_file_slices().await.unwrap();
assert_eq!(
file_slices
@@ -414,12 +428,9 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
.unwrap();
- let table = Table::new(
+ let table = Table::new_with_options(
base_url.as_str(),
- HashMap::from_iter(vec![(
- "hoodie.internal.skip.config.validation".to_string(),
- "true".to_string(),
- )]),
+ [("hoodie.internal.skip.config.validation", "true")],
)
.await
.unwrap();
@@ -473,12 +484,9 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
.unwrap();
- let table = Table::new(
+ let table = Table::new_with_options(
base_url.as_str(),
- HashMap::from_iter(vec![(
- "hoodie.internal.skip.config.validation".to_string(),
- "true".to_string(),
- )]),
+ [("hoodie.internal.skip.config.validation", "true")],
)
.await
.unwrap();
@@ -505,12 +513,9 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_invalid")).unwrap())
.unwrap();
- let table = Table::new(
+ let table = Table::new_with_options(
base_url.as_str(),
- HashMap::from_iter(vec![(
- "hoodie.internal.skip.config.validation".to_string(),
- "true".to_string(),
- )]),
+ [("hoodie.internal.skip.config.validation", "true")],
)
.await
.unwrap();
@@ -543,12 +548,9 @@ mod tests {
let base_url =
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
.unwrap();
- let table = Table::new(
+ let table = Table::new_with_options(
base_url.as_str(),
- HashMap::from_iter(vec![(
- "hoodie.internal.skip.config.validation".to_string(),
- "true".to_string(),
- )]),
+ [("hoodie.internal.skip.config.validation", "true")],
)
.await
.unwrap();
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 3ea5e25..94e1ae7 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -38,6 +38,7 @@ datafusion-physical-expr = { workspace = true }
# runtime / async
async-trait = { workspace = true }
tokio = { workspace = true }
+url = { workspace = true }
[dev-dependencies]
hudi-tests = { path = "../tests" }
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index fe23015..5f46b23 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -18,7 +18,6 @@
*/
use std::any::Any;
-use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use std::thread;
@@ -39,8 +38,8 @@ use datafusion_physical_expr::create_physical_expr;
use DataFusionError::Execution;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
-use hudi_core::storage::utils::{get_scheme_authority, parse_uri};
-use hudi_core::HudiTable;
+use hudi_core::storage::utils::{empty_options, get_scheme_authority,
parse_uri};
+use hudi_core::table::Table as HudiTable;
#[derive(Clone, Debug)]
pub struct HudiDataSource {
@@ -48,8 +47,17 @@ pub struct HudiDataSource {
}
impl HudiDataSource {
- pub async fn new(base_uri: &str, options: HashMap<String, String>) ->
Result<Self> {
- match HudiTable::new(base_uri, options).await {
+ pub async fn new(base_uri: &str) -> Result<Self> {
+ Self::new_with_options(base_uri, empty_options()).await
+ }
+
+ pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
+ where
+ I: IntoIterator<Item = (K, V)>,
+ K: AsRef<str>,
+ V: Into<String>,
+ {
+ match HudiTable::new_with_options(base_uri, options).await {
Ok(t) => Ok(Self { table: Arc::new(t) }),
Err(e) => Err(Execution(format!("Failed to create Hudi table: {}",
e))),
}
@@ -129,11 +137,13 @@ impl TableProvider for HudiDataSource {
#[cfg(test)]
mod tests {
- use std::collections::HashMap;
+ use std::fs::canonicalize;
+ use std::path::Path;
use std::sync::Arc;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_common::ScalarValue;
+ use url::Url;
use hudi_core::config::read::HudiReadConfig::InputPartitions;
use hudi_tests::TestTable::{
@@ -146,9 +156,18 @@ mod tests {
use crate::HudiDataSource;
+ #[tokio::test]
+ async fn get_default_input_partitions() {
+ let base_url =
+
Url::from_file_path(canonicalize(Path::new("tests/data/table_props_valid")).unwrap())
+ .unwrap();
+ let hudi = HudiDataSource::new(base_url.as_str()).await.unwrap();
+ assert_eq!(hudi.get_input_partitions(), 0)
+ }
+
async fn prepare_session_context(
test_table: &TestTable,
- options: &[(String, String)],
+ options: Vec<(&str, &str)>,
) -> SessionContext {
let config = SessionConfig::new().set(
"datafusion.sql_parser.enable_ident_normalization",
@@ -156,8 +175,7 @@ mod tests {
);
let ctx = SessionContext::new_with_config(config);
let base_url = test_table.url();
- let options =
HashMap::from_iter(options.iter().cloned().collect::<HashMap<_, _>>());
- let hudi = HudiDataSource::new(base_url.as_str(), options)
+ let hudi = HudiDataSource::new_with_options(base_url.as_str(), options)
.await
.unwrap();
ctx.register_table(test_table.as_ref(), Arc::new(hudi))
@@ -211,11 +229,8 @@ mod tests {
(V6TimebasedkeygenNonhivestyle, 2),
] {
println!(">>> testing for {}", test_table.as_ref());
- let ctx = prepare_session_context(
- test_table,
- &[(InputPartitions.as_ref().to_string(), "2".to_string())],
- )
- .await;
+ let options = vec![(InputPartitions.as_ref(), "2")];
+ let ctx = prepare_session_context(test_table, options).await;
let sql = format!(
r#"
@@ -249,11 +264,8 @@ mod tests {
&[(V6SimplekeygenNonhivestyleOverwritetable, 1)]
{
println!(">>> testing for {}", test_table.as_ref());
- let ctx = prepare_session_context(
- test_table,
- &[(InputPartitions.as_ref().to_string(), "2".to_string())],
- )
- .await;
+ let ctx =
+ prepare_session_context(test_table,
vec![(InputPartitions.as_ref(), "2")]).await;
let sql = format!(
r#"
diff --git a/crates/datafusion/Cargo.toml
b/crates/datafusion/tests/data/table_props_valid/.hoodie/hoodie.properties
similarity index 50%
copy from crates/datafusion/Cargo.toml
copy to crates/datafusion/tests/data/table_props_valid/.hoodie/hoodie.properties
index 3ea5e25..d1e5ac6 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/tests/data/table_props_valid/.hoodie/hoodie.properties
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,30 +15,25 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+#
-[package]
-name = "hudi-datafusion"
-version.workspace = true
-edition.workspace = true
-license.workspace = true
-rust-version.workspace = true
-
-[dependencies]
-hudi-core = { path = "../core" }
-# arrow
-arrow-schema = { workspace = true }
-
-# datafusion
-datafusion = { workspace = true }
-datafusion-expr = { workspace = true }
-datafusion-common = { workspace = true }
-datafusion-proto = { workspace = true }
-datafusion-sql = { workspace = true }
-datafusion-physical-expr = { workspace = true }
-
-# runtime / async
-async-trait = { workspace = true }
-tokio = { workspace = true }
-
-[dev-dependencies]
-hudi-tests = { path = "../tests" }
+hoodie.table.type=copy_on_write
+hoodie.table.metadata.partitions=files
+hoodie.table.precombine.field=ts
+hoodie.table.partition.fields=city
+hoodie.archivelog.folder=archived
+hoodie.table.cdc.enabled=false
+hoodie.timeline.layout.version=1
+hoodie.table.checksum=3761586722
+hoodie.datasource.write.drop.partition.columns=false
+hoodie.table.recordkey.fields=uuid
+hoodie.table.name=trips
+hoodie.database.name=db
+hoodie.partition.metafile.use.base.format=false
+hoodie.datasource.write.hive_style_partitioning=false
+hoodie.table.metadata.partitions.inflight=
+hoodie.populate.meta.fields=true
+hoodie.table.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
+hoodie.table.base.file.format=PARQUET
+hoodie.datasource.write.partitionpath.urlencode=false
+hoodie.table.version=6
diff --git a/python/Cargo.toml b/python/Cargo.toml
index d3bcf70..77f16fa 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -36,7 +36,7 @@ anyhow = { workspace = true }
# runtime / async
futures = { workspace = true }
-tokio = { workspace = true, features = ["rt-multi-thread"] }
+tokio = { workspace = true }
[dependencies.pyo3]
version = "0.21.2"
diff --git a/python/hudi/__init__.py b/python/hudi/__init__.py
index 8054368..1dee57b 100644
--- a/python/hudi/__init__.py
+++ b/python/hudi/__init__.py
@@ -15,7 +15,5 @@
# specific language governing permissions and limitations
# under the License.
-from ._internal import __version__ as __version__
-from ._internal import rust_core_version as rust_core_version
from ._internal import HudiFileSlice as HudiFileSlice
-from .table import HudiTable as HudiTable
+from ._internal import HudiTable as HudiTable
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 3485316..421a80c 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -14,17 +14,13 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
-from typing import List, Dict, Optional
+from dataclasses import dataclass
+from typing import Optional, Dict, List
import pyarrow
-__version__: str
-
-
-def rust_core_version() -> str: ...
-
+@dataclass(init=False)
class HudiFileSlice:
file_group_id: str
partition_path: str
@@ -36,12 +32,13 @@ class HudiFileSlice:
def base_file_relative_path(self) -> str: ...
-class BindingHudiTable:
+@dataclass(init=False)
+class HudiTable:
def __init__(
self,
table_uri: str,
- storage_options: Optional[Dict[str, str]] = None,
+ options: Optional[Dict[str, str]] = None,
): ...
def get_schema(self) -> "pyarrow.Schema": ...
diff --git a/python/hudi/table.py b/python/hudi/table.py
deleted file mode 100644
index 107b665..0000000
--- a/python/hudi/table.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import os
-from dataclasses import dataclass
-from pathlib import Path
-from typing import Union, List, Optional, Dict
-
-import pyarrow
-from hudi._internal import BindingHudiTable, HudiFileSlice
-
-
-@dataclass(init=False)
-class HudiTable:
-
- def __init__(
- self,
- table_uri: Union[str, Path, "os.PathLike[str]"],
- storage_options: Optional[Dict[str, str]] = None,
- ):
- self._table = BindingHudiTable(str(table_uri), storage_options)
-
- def get_schema(self) -> "pyarrow.Schema":
- return self._table.get_schema()
-
- def split_file_slices(self, n: int) -> List[List[HudiFileSlice]]:
- return self._table.split_file_slices(n)
-
- def get_file_slices(self) -> List[HudiFileSlice]:
- return self._table.get_file_slices()
-
- def read_file_slice(self, base_file_relative_path: str) ->
"pyarrow.RecordBatch":
- return self._table.read_file_slice(base_file_relative_path)
-
- def read_snapshot(self) -> List["pyarrow.RecordBatch"]:
- return self._table.read_snapshot()
-
- def read_snapshot_as_of(self, timestamp: str) ->
List["pyarrow.RecordBatch"]:
- return self._table.read_snapshot_as_of(timestamp)
diff --git a/python/src/lib.rs b/python/src/internal.rs
similarity index 71%
copy from python/src/lib.rs
copy to python/src/internal.rs
index 745b457..141a74e 100644
--- a/python/src/lib.rs
+++ b/python/src/internal.rs
@@ -16,23 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::OnceLock;
use anyhow::Context;
use arrow::pyarrow::ToPyArrow;
-use pyo3::prelude::*;
+use pyo3::{pyclass, pymethods, PyErr, PyObject, PyResult, Python};
use tokio::runtime::Runtime;
use hudi::file_group::FileSlice;
-use hudi::HudiTable;
+use hudi::table::Table;
#[cfg(not(tarpaulin))]
#[derive(Clone, Debug)]
#[pyclass]
-struct HudiFileSlice {
+pub struct HudiFileSlice {
#[pyo3(get)]
file_group_id: String,
#[pyo3(get)]
@@ -50,7 +49,7 @@ struct HudiFileSlice {
#[cfg(not(tarpaulin))]
#[pymethods]
impl HudiFileSlice {
- pub fn base_file_relative_path(&self) -> PyResult<String> {
+ fn base_file_relative_path(&self) -> PyResult<String> {
PathBuf::from(&self.partition_path)
.join(&self.base_file_name)
.to_str()
@@ -83,28 +82,28 @@ fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
#[cfg(not(tarpaulin))]
#[pyclass]
-struct BindingHudiTable {
- _table: HudiTable,
+pub struct HudiTable {
+ _table: Table,
}
#[cfg(not(tarpaulin))]
#[pymethods]
-impl BindingHudiTable {
+impl HudiTable {
#[new]
- #[pyo3(signature = (table_uri, storage_options = None))]
- fn new(table_uri: &str, storage_options: Option<HashMap<String, String>>)
-> PyResult<Self> {
- let _table = rt().block_on(HudiTable::new(
+ #[pyo3(signature = (table_uri, options = None))]
+ fn new(table_uri: &str, options: Option<HashMap<String, String>>) ->
PyResult<Self> {
+ let _table = rt().block_on(Table::new_with_options(
table_uri,
- storage_options.unwrap_or_default(),
+ options.unwrap_or_default(),
))?;
- Ok(BindingHudiTable { _table })
+ Ok(HudiTable { _table })
}
- pub fn get_schema(&self, py: Python) -> PyResult<PyObject> {
+ fn get_schema(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self._table.get_schema())?.to_pyarrow(py)
}
- pub fn split_file_slices(&self, n: usize, py: Python) ->
PyResult<Vec<Vec<HudiFileSlice>>> {
+ fn split_file_slices(&self, n: usize, py: Python) ->
PyResult<Vec<Vec<HudiFileSlice>>> {
py.allow_threads(|| {
let file_slices = rt().block_on(self._table.split_file_slices(n))?;
Ok(file_slices
@@ -114,42 +113,25 @@ impl BindingHudiTable {
})
}
- pub fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
+ fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
py.allow_threads(|| {
let file_slices = rt().block_on(self._table.get_file_slices())?;
Ok(file_slices.iter().map(convert_file_slice).collect())
})
}
- pub fn read_file_slice(&self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
+ fn read_file_slice(&self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
rt().block_on(self._table.read_file_slice_by_path(relative_path))?
.to_pyarrow(py)
}
- pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
+ fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
}
}
#[cfg(not(tarpaulin))]
-#[pyfunction]
-fn rust_core_version() -> &'static str {
- hudi::crate_version()
-}
-
-#[cfg(not(tarpaulin))]
-#[pymodule]
-fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
- m.add("__version__", env!("CARGO_PKG_VERSION"))?;
- m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
-
- m.add_class::<HudiFileSlice>()?;
- m.add_class::<BindingHudiTable>()?;
- Ok(())
-}
-
-#[cfg(not(tarpaulin))]
-pub fn rt() -> &'static Runtime {
+fn rt() -> &'static Runtime {
static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio
runtime."))
}
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 745b457..99b7ef9 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -16,140 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-use std::collections::HashMap;
-use std::path::PathBuf;
-use std::sync::OnceLock;
-
-use anyhow::Context;
-use arrow::pyarrow::ToPyArrow;
use pyo3::prelude::*;
-use tokio::runtime::Runtime;
-
-use hudi::file_group::FileSlice;
-use hudi::HudiTable;
-
-#[cfg(not(tarpaulin))]
-#[derive(Clone, Debug)]
-#[pyclass]
-struct HudiFileSlice {
- #[pyo3(get)]
- file_group_id: String,
- #[pyo3(get)]
- partition_path: String,
- #[pyo3(get)]
- commit_time: String,
- #[pyo3(get)]
- base_file_name: String,
- #[pyo3(get)]
- base_file_size: usize,
- #[pyo3(get)]
- num_records: i64,
-}
-#[cfg(not(tarpaulin))]
-#[pymethods]
-impl HudiFileSlice {
- pub fn base_file_relative_path(&self) -> PyResult<String> {
- PathBuf::from(&self.partition_path)
- .join(&self.base_file_name)
- .to_str()
- .map(String::from)
- .context(format!(
- "Failed to get base file relative path for file slice: {:?}",
- self
- ))
- .map_err(PyErr::from)
- }
-}
-
-#[cfg(not(tarpaulin))]
-fn convert_file_slice(f: &FileSlice) -> HudiFileSlice {
- let file_group_id = f.file_group_id().to_string();
- let partition_path =
f.partition_path.as_deref().unwrap_or_default().to_string();
- let commit_time = f.base_file.commit_time.to_string();
- let base_file_name = f.base_file.info.name.clone();
- let base_file_size = f.base_file.info.size;
- let num_records =
f.base_file.stats.clone().unwrap_or_default().num_records;
- HudiFileSlice {
- file_group_id,
- partition_path,
- commit_time,
- base_file_name,
- base_file_size,
- num_records,
- }
-}
-
-#[cfg(not(tarpaulin))]
-#[pyclass]
-struct BindingHudiTable {
- _table: HudiTable,
-}
-
-#[cfg(not(tarpaulin))]
-#[pymethods]
-impl BindingHudiTable {
- #[new]
- #[pyo3(signature = (table_uri, storage_options = None))]
- fn new(table_uri: &str, storage_options: Option<HashMap<String, String>>)
-> PyResult<Self> {
- let _table = rt().block_on(HudiTable::new(
- table_uri,
- storage_options.unwrap_or_default(),
- ))?;
- Ok(BindingHudiTable { _table })
- }
-
- pub fn get_schema(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self._table.get_schema())?.to_pyarrow(py)
- }
-
- pub fn split_file_slices(&self, n: usize, py: Python) ->
PyResult<Vec<Vec<HudiFileSlice>>> {
- py.allow_threads(|| {
- let file_slices = rt().block_on(self._table.split_file_slices(n))?;
- Ok(file_slices
- .iter()
- .map(|inner_vec|
inner_vec.iter().map(convert_file_slice).collect())
- .collect())
- })
- }
-
- pub fn get_file_slices(&self, py: Python) -> PyResult<Vec<HudiFileSlice>> {
- py.allow_threads(|| {
- let file_slices = rt().block_on(self._table.get_file_slices())?;
- Ok(file_slices.iter().map(convert_file_slice).collect())
- })
- }
-
- pub fn read_file_slice(&self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
- rt().block_on(self._table.read_file_slice_by_path(relative_path))?
- .to_pyarrow(py)
- }
-
- pub fn read_snapshot(&self, py: Python) -> PyResult<PyObject> {
- rt().block_on(self._table.read_snapshot())?.to_pyarrow(py)
- }
-}
-
-#[cfg(not(tarpaulin))]
-#[pyfunction]
-fn rust_core_version() -> &'static str {
- hudi::crate_version()
-}
+mod internal;
#[cfg(not(tarpaulin))]
#[pymodule]
fn _internal(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
- m.add("__version__", env!("CARGO_PKG_VERSION"))?;
- m.add_function(wrap_pyfunction!(rust_core_version, m)?)?;
-
+ use internal::{HudiFileSlice, HudiTable};
m.add_class::<HudiFileSlice>()?;
- m.add_class::<BindingHudiTable>()?;
+ m.add_class::<HudiTable>()?;
Ok(())
}
-
-#[cfg(not(tarpaulin))]
-pub fn rt() -> &'static Runtime {
- static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
- TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio
runtime."))
-}
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 61195fb..e56463c 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -26,7 +26,7 @@ pytestmark = pytest.mark.skipif(PYARROW_LE_8_0_0,
reason="hudi only supported if
def test_sample_table(get_sample_table):
table_path = get_sample_table
- table = HudiTable(table_path, {})
+ table = HudiTable(table_path)
assert table.get_schema().names == ['_hoodie_commit_time',
'_hoodie_commit_seqno', '_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name', 'ts', 'uuid', 'rider', 'driver',