This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 99fb1dc feat: modernize Table API with async-only design (#537)
99fb1dc is described below
commit 99fb1dc45c7590d1ea50e1480c989ab91fe89757
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Mar 21 19:23:21 2026 -0500
feat: modernize Table API with async-only design (#537)
---
.github/copilot-instructions.md | 84 +---
.github/instructions/code-review.instructions.md | 126 +-----
.github/instructions/python.instructions.md | 19 -
.github/instructions/rust.instructions.md | 178 +-------
.licenserc.yaml | 2 +
README.md | 6 +-
cpp/Cargo.toml | 1 +
cpp/src/lib.rs | 81 ++--
crates/core/src/config/internal.rs | 5 +-
crates/core/src/config/read.rs | 5 +-
crates/core/src/file_group/reader.rs | 152 +++----
crates/core/src/lib.rs | 5 +-
crates/core/src/metadata/table/mod.rs | 76 ++--
crates/core/src/schema/mod.rs | 75 ++++
crates/core/src/schema/resolver.rs | 91 ++--
crates/core/src/table/mod.rs | 539 +++++++++--------------
crates/core/src/timeline/mod.rs | 17 +-
crates/core/tests/table_read_tests.rs | 191 ++++----
crates/datafusion/src/lib.rs | 22 +-
python/hudi/_internal.pyi | 22 +-
python/src/internal.rs | 145 +++---
python/tests/test_table_read.py | 2 +-
22 files changed, 773 insertions(+), 1071 deletions(-)
diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md
index 3bb04ab..b8f43ad 100644
--- a/.github/copilot-instructions.md
+++ b/.github/copilot-instructions.md
@@ -1,22 +1,3 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
--->
-
# GitHub Copilot Instructions for Apache Hudi-rs
## Project Overview
@@ -74,71 +55,10 @@ make format check
make test
```
-## Code Review Guidelines
-
-### Multi-Round Review Behavior
-
-When reviewing subsequent commits on a PR:
-
-1. **Auto-resolve addressed comments**: If a new commit addresses a previous
review comment, acknowledge it is resolved rather than re-raising the same
issue.
-2. **Focus on new changes**: Prioritize reviewing newly added or modified code
in the latest commits.
-3. **Track incremental progress**: Recognize when feedback has been
incorporated and provide confirmation.
-4. **Avoid duplicate comments**: Do not repeat comments that have been
addressed in subsequent commits.
-
-### Review Priority (High to Low)
-
-1. **Correctness**: Logic errors, data races, memory safety
-2. **API Design**: Public API consistency, breaking changes
-3. **Error Handling**: Proper use of `Result`, meaningful error messages
-4. **Performance**: Unnecessary allocations, inefficient patterns
-5. **Testing**: Test coverage, edge cases
-6. **Documentation**: Public API docs, complex logic explanation
-7. **Style**: Idiomatic Rust, consistency with codebase
-
-## What to Flag in Reviews
-
-### Critical Issues (Must Fix)
-
-- Panics in library code (use `Result` instead)
-- Unhandled errors (`.unwrap()` / `.expect()` in non-test code)
-- Data races or unsafe code without justification
-- Breaking public API changes without deprecation
-- Missing tests for new functionality
-
-### Important Issues (Should Fix)
-
-- Inefficient patterns (unnecessary `.clone()`, allocations in hot paths)
-- Missing documentation on public APIs
-- Large functions that should be split
-- Missing `#[must_use]` on functions returning important values
-- Blocking calls in async context
-
-### Suggestions (Nice to Have)
-
-- More idiomatic Rust patterns
-- Additional test cases for edge cases
-- Performance optimizations
-- Code organization improvements
-
-## Rust Conventions
-
-### Error Handling
-
-- Use `Result<T, E>` for fallible operations, never panic in library code
-- Propagate errors with `?` operator
-- Add context to errors when crossing module boundaries
-- Use `thiserror` for defining error types in library crates
-
-### Async Patterns
-
-- Use `tokio` as the async runtime
-- Prefer `async fn` over returning `impl Future`
-- Use `#[tokio::test]` for async tests
-- Avoid blocking calls (`std::fs`, `std::thread::sleep`) in async code
-
-### Testing
+## Testing
- Unit tests in the same file as the code (`#[cfg(test)]` module)
- Integration tests in dedicated test modules or test crate
- Test both success and error paths
- Use descriptive test names: `test_<function>_<scenario>_<expected_behavior>`
+- Use `#[tokio::test]` for async tests
diff --git a/.github/instructions/code-review.instructions.md
b/.github/instructions/code-review.instructions.md
index 54e7780..9927205 100644
--- a/.github/instructions/code-review.instructions.md
+++ b/.github/instructions/code-review.instructions.md
@@ -3,56 +3,18 @@ applyTo: "**/*"
excludeAgent: "coding-agent"
---
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
--->
-
# Code Review Instructions for Apache Hudi-rs
## Multi-Round Review Behavior
-### Handling Subsequent Commits
-
-When reviewing a PR that has been updated with new commits after a previous
review:
-
-1. **Check if previous comments are addressed**:
- - Compare the current code state against previous review comments
- - If a comment has been addressed by new changes, acknowledge resolution
- - Do NOT re-raise issues that have been fixed
-
-2. **Focus on incremental changes**:
- - Prioritize reviewing code added or modified in the latest commits
- - Identify any new issues introduced by the fixes
- - Check if fixes introduced regressions elsewhere
-
-3. **Comment resolution signals**:
- - If code now follows the suggested pattern → Comment is resolved
- - If test coverage has been added as requested → Comment is resolved
- - If documentation has been added → Comment is resolved
- - If error handling has been improved → Comment is resolved
-
-4. **Acknowledge progress**:
- - When significant improvements have been made, acknowledge them
- - Be encouraging about progress while maintaining standards
+When reviewing updated PRs:
-### Review Comment Format
+- Do NOT re-raise issues addressed by new commits
+- Focus on code added or modified in latest commits
+- Check if fixes introduced regressions elsewhere
+- Acknowledge progress while maintaining standards
-When leaving comments, use this severity classification:
+### Severity Classification
- **🔴 Critical**: Must fix before merge (correctness, safety, breaking changes)
- **🟠 Important**: Should fix before merge (error handling, testing, docs)
@@ -116,63 +78,19 @@ When leaving comments, use this severity classification:
## Patterns to Flag
-### Always Flag (Critical)
-
-```rust
-// Unwrap in library code
-let value = result.unwrap(); // 🔴 Use ? or proper error handling
-
-// Panic in library code
-panic!("unexpected state"); // 🔴 Return Result with error
-
-// Blocking in async
-std::thread::sleep(duration); // 🔴 Use tokio::time::sleep
-
-// Hardcoded credentials
-let key = "AKIAXXXXXXXX"; // 🔴 Security issue
-```
-
-### Usually Flag (Important)
-
-```rust
-// Missing error context
-.map_err(HudiError::from)? // 🟠 Add context about what failed
-
-// Clone when borrow would work
-fn process(data: Vec<u8>) { } // 🟠 Consider &[u8] if not consuming
-
-// Large type on stack
-let buffer: [u8; 1_000_000]; // 🟠 Use Vec or Box for large allocations
-
-// Missing docs on pub items
-pub fn important_function() { } // 🟠 Add doc comment
-```
-
-### Sometimes Flag (Suggestion)
-
-```rust
-// Could use iterator methods
-let mut result = Vec::new();
-for item in items {
- if predicate(&item) {
- result.push(transform(item));
- }
-}
-// 🟡 Consider: items.into_iter().filter(predicate).map(transform).collect()
-
-// Nested Result handling
-match result {
- Ok(inner) => match inner { ... } // 🟡 Consider using and_then or ?
- Err(e) => ...
-}
-```
-
-## Cross-File Impact Assessment
-
-When changes touch these areas, expand review scope:
-
-- **Core table implementation**: Check impacts on Python bindings
-- **Public API surface (`hudi` crate)**: Check for breaking API changes
-- **DataFusion integration**: Verify DataFusion compatibility
-- **Schema types or conversions**: Check all serialization/deserialization
paths
-- **Configuration structs**: Verify backward compatibility
+- 🔴 `.unwrap()` / `.expect()` / `panic!()` in non-test code
+- 🔴 Blocking calls (`std::thread::sleep`, `std::fs`) in async context
+- 🔴 Hardcoded credentials or secrets
+- 🟠 Missing error context (bare `.map_err()` without message)
+- 🟠 Unnecessary `.clone()`, taking ownership when borrow suffices
+- 🟠 Missing doc comments on public items
+- 🟡 Imperative loops replaceable with iterator chains
+- 🟡 Nested `match` on `Result` replaceable with `and_then` or `?`
+
+## Cross-File Impact
+
+- **Core table changes** → check Python/C++ bindings
+- **Public API (`hudi` crate)** → check for breaking changes
+- **DataFusion integration** → verify compatibility
+- **Schema/type conversions** → check serialization paths
+- **Configuration structs** → verify backward compatibility
diff --git a/.github/instructions/python.instructions.md
b/.github/instructions/python.instructions.md
index d787c1c..3657784 100644
--- a/.github/instructions/python.instructions.md
+++ b/.github/instructions/python.instructions.md
@@ -2,25 +2,6 @@
applyTo: "python/**"
---
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
--->
-
# Python Bindings Instructions
## PyO3 Patterns
diff --git a/.github/instructions/rust.instructions.md
b/.github/instructions/rust.instructions.md
index fb0ddff..6d357e7 100644
--- a/.github/instructions/rust.instructions.md
+++ b/.github/instructions/rust.instructions.md
@@ -2,129 +2,39 @@
applyTo: "**/*.rs"
---
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
--->
-
# Rust Instructions for Apache Hudi-rs
-## Error Handling Patterns (Critical)
-
-### Must Use Result Over Panic
-
-- Flag any `.unwrap()` or `.expect()` in non-test code as **Critical**
-- Flag any `panic!()` macro in library code as **Critical**
-- Exception: `unreachable!()` for truly impossible states is acceptable with a
comment explaining why
-
-```rust
-// GOOD
-fn parse_config(input: &str) -> Result<Config, HudiError> {
- serde_json::from_str(input).map_err(HudiError::from)
-}
-
-// BAD - Will panic on invalid input
-fn parse_config(input: &str) -> Config {
- serde_json::from_str(input).unwrap()
-}
-```
-
-### Error Context
+## Error Handling (Critical)
+- Flag `.unwrap()` / `.expect()` in non-test code as **Critical**
+- Flag `panic!()` in library code as **Critical**
+- Exception: `unreachable!()` is acceptable with a comment explaining why
- Prefer `anyhow::Context` or custom error types with context over bare
`.map_err()`
- Error messages should be actionable and include relevant values
```rust
-// GOOD
+// GOOD - with context
let file = File::open(&path)
.with_context(|| format!("Failed to open table metadata at {}",
path.display()))?;
-// LESS GOOD
+// BAD - no context
let file = File::open(&path).map_err(|e| HudiError::Io(e))?;
```
## Memory and Performance
-### Avoid Unnecessary Allocations
-
-- Flag unnecessary `.clone()` calls, especially on large types like
`Vec<RecordBatch>`
-- Prefer `&str` over `String` in function parameters when ownership isn't
needed
+- Flag unnecessary `.clone()`, especially on large types like
`Vec<RecordBatch>`
+- Prefer `&str` / `&[T]` over owned types in parameters when ownership isn't
needed
- Use `Cow<'_, str>` when a function might or might not need to allocate
-
-```rust
-// GOOD - Takes reference
-fn filter_partitions(partitions: &[String], predicate: &str) -> Vec<&String> {
... }
-
-// LESS EFFICIENT - Clones unnecessarily
-fn filter_partitions(partitions: Vec<String>, predicate: String) ->
Vec<String> { ... }
-```
-
-### Arrow Memory Patterns
-
-- Prefer Arrow compute kernels over manual iteration
+- Prefer Arrow compute kernels over manual iteration on arrays
- Use `arrow::compute::concat_batches` for combining batches
-- Be mindful of memory when working with large datasets
-
-```rust
-// GOOD - Uses Arrow compute kernel
-use arrow::compute::filter;
-let filtered = filter(&batch, &predicate)?;
-
-// AVOID - Manual iteration on Arrow arrays
-let mut result = Vec::new();
-for i in 0..array.len() {
- if predicate.value(i) {
- result.push(array.value(i));
- }
-}
-```
## Async Code Patterns
-### Future Send Bounds
-
-- Ensure async functions return `Send` futures for compatibility with
multi-threaded executors
-- Flag holding non-Send types (like `Rc`, `RefCell`) across await points
-
-```rust
-// GOOD - Future is Send
-pub async fn read_files(paths: Vec<PathBuf>) -> Result<Vec<RecordBatch>> {
- let futures = paths.into_iter().map(|p| async move {
- read_file(&p).await
- });
- futures::future::try_join_all(futures).await
-}
-```
-
-### Avoid Blocking in Async Context
-
-- Flag blocking I/O operations in async functions
+- Ensure async functions return `Send` futures (no `Rc`, `RefCell` across
await points)
+- Flag blocking I/O (`std::fs`, `std::thread::sleep`) in async functions
- Use `tokio::task::spawn_blocking` for CPU-intensive work
-```rust
-// GOOD
-let result = tokio::task::spawn_blocking(move || {
- expensive_computation(&data)
-}).await?;
-
-// BAD - Blocks the async runtime
-let result = expensive_computation(&data); // In async function
-```
-
## API Design
### Builder Pattern
@@ -133,7 +43,6 @@ let result = expensive_computation(&data); // In async
function
- Builders should consume `self` and return `Self` for chaining
```rust
-// GOOD - Follows project convention
pub struct TableBuilder {
base_uri: String,
options: HashMap<String, String>,
@@ -141,12 +50,7 @@ pub struct TableBuilder {
impl TableBuilder {
pub fn from_base_uri(uri: impl Into<String>) -> Self { ... }
-
- pub fn with_option(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self {
- self.options.insert(key.into(), value.into());
- self
- }
-
+ pub fn with_option(mut self, key: impl Into<String>, value: impl
Into<String>) -> Self { ... }
pub async fn build(self) -> Result<Table> { ... }
}
```
@@ -156,61 +60,3 @@ impl TableBuilder {
- All public items must have doc comments
- Include examples in doc comments for complex APIs
- Document panics, errors, and safety requirements
-
-```rust
-/// Reads a snapshot of the Hudi table.
-///
-/// # Arguments
-///
-/// * `filters` - Optional partition filters in the format `(column, op,
value)`
-///
-/// # Errors
-///
-/// Returns an error if:
-/// - The table path is invalid
-/// - The table metadata cannot be read
-/// - Schema parsing fails
-///
-/// # Examples
-///
-/// ```rust
-/// let batches = table.read_snapshot(&[("city", "=",
"san_francisco")]).await?;
-/// ```
-pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) ->
Result<Vec<RecordBatch>> {
- // ...
-}
-```
-
-## Testing
-
-### Test Organization
-
-- Unit tests in `#[cfg(test)] mod tests` at bottom of file
-- Integration tests in dedicated test modules
-- Use `#[tokio::test]` for async tests
-
-```rust
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[tokio::test]
- async fn test_read_snapshot_with_filters() {
- let table = create_test_table().await;
- let batches = table.read_snapshot(&[("city", "=",
"test")]).await.unwrap();
- assert!(!batches.is_empty());
- }
-
- #[test]
- fn test_parse_config_invalid_json() {
- let result = parse_config("not json");
- assert!(result.is_err());
- }
-}
-```
-
-### Test Both Paths
-
-- Test success cases AND error cases
-- Test edge cases (empty input, boundary values)
-- Test that errors contain useful information
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 57569a6..cb2eb60 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -25,6 +25,8 @@ header:
- 'NOTICE'
- '**/data/**'
- '.github/PULL_REQUEST_TEMPLATE.md'
+ - '.github/copilot-instructions.md'
+ - '.github/instructions/**'
- 'crates/core/schemas/**'
- 'benchmark/tpch/queries/**'
diff --git a/README.md b/README.md
index 7412299..4cdb8e4 100644
--- a/README.md
+++ b/README.md
@@ -206,11 +206,12 @@ record_batch =
reader.read_file_slice_by_base_file_path("relative/path.parquet")
#### Rust
-```rust
+```rust,ignore
use hudi::file_group::reader::FileGroupReader;
+// Inside an async context
let reader = FileGroupReader::new_with_options(
- "/table/base/path", [("hoodie.read.file_group.start_timestamp", "0")])?;
+ "/table/base/path", [("hoodie.read.file_group.start_timestamp",
"0")]).await?;
// Returns an Arrow RecordBatch
let record_batch =
reader.read_file_slice_by_base_file_path("relative/path.parquet").await?;
@@ -223,6 +224,7 @@ let record_batch =
reader.read_file_slice_by_base_file_path("relative/path.parqu
#include "src/lib.rs.h"
#include "arrow/c/abi.h"
+// Functions may throw rust::Error on failure
auto reader = new_file_group_reader_with_options(
"/table/base/path", {"hoodie.read.file_group.start_timestamp=0"});
diff --git a/cpp/Cargo.toml b/cpp/Cargo.toml
index 7501a6b..00af632 100644
--- a/cpp/Cargo.toml
+++ b/cpp/Cargo.toml
@@ -37,6 +37,7 @@ hudi-test = { path = "../crates/test", version = "0.5.0-dev" }
cxx = { version = "1.0" }
arrow = { workspace = true , features = ["ffi"]}
arrow-array = { workspace = true , features = ["ffi"]}
+tokio = { workspace = true }
[build-dependencies]
cxx-build = { version = "1.0" }
diff --git a/cpp/src/lib.rs b/cpp/src/lib.rs
index 68eb579..eddf677 100644
--- a/cpp/src/lib.rs
+++ b/cpp/src/lib.rs
@@ -37,81 +37,95 @@ mod ffi {
fn new_file_group_reader_with_options(
base_uri: &CxxString,
options: &CxxVector<CxxString>,
- ) -> Box<HudiFileGroupReader>;
+ ) -> Result<Box<HudiFileGroupReader>>;
type HudiFileSlice;
fn new_file_slice_from_file_names(
partition_path: &CxxString,
base_file_name: &CxxString,
log_file_names: &CxxVector<CxxString>,
- ) -> Box<HudiFileSlice>;
+ ) -> Result<Box<HudiFileSlice>>;
fn read_file_slice_by_base_file_path(
self: &HudiFileGroupReader,
relative_path: &CxxString,
- ) -> *mut ArrowArrayStream;
+ ) -> Result<*mut ArrowArrayStream>;
fn read_file_slice(
self: &HudiFileGroupReader,
file_slice: &HudiFileSlice,
- ) -> *mut ArrowArrayStream;
+ ) -> Result<*mut ArrowArrayStream>;
}
}
pub struct HudiFileGroupReader {
inner: FileGroupReader,
+ rt: tokio::runtime::Runtime,
}
pub fn new_file_group_reader_with_options(
base_uri: &CxxString,
options: &CxxVector<CxxString>,
-) -> Box<HudiFileGroupReader> {
+) -> std::result::Result<Box<HudiFileGroupReader>, String> {
let base_uri = base_uri
.to_str()
- .expect("Failed to convert CxxString to str: Invalid UTF-8 sequence");
+ .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
let mut opt_vec = Vec::new();
for opt in options.iter() {
let opt_str = opt
.to_str()
- .expect("Failed to convert CxxString to str: Invalid UTF-8
sequence");
+ .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
if let Some((key, value)) = opt_str.split_once('=') {
opt_vec.push((key, value))
}
}
- let reader = FileGroupReader::new_with_options(base_uri, opt_vec)
- .expect("Failed to create FileGroupReader with options");
- let reader_wrapper = HudiFileGroupReader { inner: reader };
- Box::new(reader_wrapper)
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()
+ .map_err(|e| format!("Failed to create tokio runtime: {e}"))?;
+ let reader = rt
+ .block_on(FileGroupReader::new_with_options(base_uri, opt_vec))
+ .map_err(|e| format!("Failed to create FileGroupReader: {e}"))?;
+ Ok(Box::new(HudiFileGroupReader { inner: reader, rt }))
}
impl HudiFileGroupReader {
pub fn read_file_slice_by_base_file_path(
&self,
relative_path: &CxxString,
- ) -> *mut ffi::ArrowArrayStream {
+ ) -> std::result::Result<*mut ffi::ArrowArrayStream, String> {
let relative_path = relative_path
.to_str()
- .expect("Failed to convert CxxString to str: Invalid UTF-8
sequence");
+ .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
let record_batch = self
- .inner
- .read_file_slice_by_base_file_path_blocking(relative_path)
- .expect("Failed to read file batch");
+ .rt
+
.block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
+ .map_err(|e| format!("Failed to read file batch: {e}"))?;
let schema = record_batch.schema();
- create_raw_pointer_for_record_batches(vec![record_batch], schema)
+ Ok(create_raw_pointer_for_record_batches(
+ vec![record_batch],
+ schema,
+ ))
}
- pub fn read_file_slice(&self, file_slice: &HudiFileSlice) -> *mut
ffi::ArrowArrayStream {
+ pub fn read_file_slice(
+ &self,
+ file_slice: &HudiFileSlice,
+ ) -> std::result::Result<*mut ffi::ArrowArrayStream, String> {
let record_batch = self
- .inner
- .read_file_slice_blocking(&file_slice.inner)
- .expect("Failed to read file slice");
+ .rt
+ .block_on(self.inner.read_file_slice(&file_slice.inner))
+ .map_err(|e| format!("Failed to read file slice: {e}"))?;
let schema = record_batch.schema();
- create_raw_pointer_for_record_batches(vec![record_batch], schema)
+ Ok(create_raw_pointer_for_record_batches(
+ vec![record_batch],
+ schema,
+ ))
}
}
@@ -123,38 +137,35 @@ pub fn new_file_slice_from_file_names(
partition_path: &CxxString,
base_file_name: &CxxString,
log_file_names: &CxxVector<CxxString>,
-) -> Box<HudiFileSlice> {
+) -> std::result::Result<Box<HudiFileSlice>, String> {
let partition_path = partition_path
.to_str()
- .expect("Failed to convert CxxString to str: Invalid UTF-8 sequence");
+ .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
let base_file_name = base_file_name
.to_str()
- .expect("Failed to convert CxxString to str: Invalid UTF-8 sequence");
+ .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
let log_file_names = log_file_names
.iter()
.map(|name| {
name.to_str()
- .expect("Failed to convert CxxString to str: Invalid UTF-8
sequence")
+ .map_err(|e| format!("Failed to convert CxxString to str:
{e}"))
})
- .collect::<Vec<_>>();
+ .collect::<std::result::Result<Vec<_>, _>>()?;
let mut file_group = FileGroup::new_with_base_file_name(base_file_name,
partition_path)
- .expect("Failed to create FileGroup");
+ .map_err(|e| format!("Failed to create FileGroup: {e}"))?;
file_group
.add_log_files_from_names(&log_file_names)
- .expect("Failed to add files to FileGroup");
+ .map_err(|e| format!("Failed to add files to FileGroup: {e}"))?;
let (_, file_slice) = file_group
.file_slices
.iter()
.next()
- .expect("Failed to get file slice from FileGroup");
+ .ok_or_else(|| format!("Failed to get file slice from FileGroup:
{file_group:?}"))?;
- // todo: add api to create file slice from names to avoid cloning
- let file_slice_wrapper = HudiFileSlice {
+ Ok(Box::new(HudiFileSlice {
inner: file_slice.clone(),
- };
-
- Box::new(file_slice_wrapper)
+ }))
}
diff --git a/crates/core/src/config/internal.rs
b/crates/core/src/config/internal.rs
index 965df95..9f1b7cb 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -36,8 +36,11 @@ use crate::config::{ConfigParser, HudiConfigValue};
/// use hudi_core::config::internal::HudiInternalConfig::SkipConfigValidation;
/// use hudi_core::table::Table as HudiTable;
///
+/// # #[tokio::main]
+/// # async fn main() {
/// let options = [(SkipConfigValidation, "true")];
-/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
+/// HudiTable::new_with_options("/tmp/hudi_data", options).await;
+/// # }
/// ```
///
#[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index a5acbc5..9352702 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -36,8 +36,11 @@ use crate::config::{ConfigParser, HudiConfigValue};
/// use hudi_core::config::read::HudiReadConfig::InputPartitions;
/// use hudi_core::table::Table as HudiTable;
///
+/// # #[tokio::main]
+/// # async fn main() {
/// let options = [(InputPartitions, "2")];
-/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
+/// HudiTable::new_with_options("/tmp/hudi_data", options).await;
+/// # }
/// ```
///
diff --git a/crates/core/src/file_group/reader.rs
b/crates/core/src/file_group/reader.rs
index 8de9d86..1031a39 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -87,27 +87,21 @@ impl FileGroupReader {
///
/// # Notes
/// This API uses [`OptionResolver`] that loads table properties from
storage to resolve options.
- pub fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
+ pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async {
- let mut resolver = OptionResolver::new_with_options(base_uri,
options);
- resolver.resolve_options().await?;
- let hudi_configs =
Arc::new(HudiConfigs::new(resolver.hudi_options));
- let storage =
- Storage::new(Arc::new(resolver.storage_options),
hudi_configs.clone())?;
-
- Ok(Self {
- hudi_configs,
- storage,
- })
- })
+ let mut resolver = OptionResolver::new_with_options(base_uri, options);
+ resolver.resolve_options().await?;
+ let hudi_configs = Arc::new(HudiConfigs::new(resolver.hudi_options));
+ let storage = Storage::new(Arc::new(resolver.storage_options),
hudi_configs.clone())?;
+
+ Ok(Self {
+ hudi_configs,
+ storage,
+ })
}
/// Reads the data from the base file at the given relative path.
@@ -130,17 +124,6 @@ impl FileGroupReader {
apply_commit_time_filter(&self.hudi_configs, records)
}
- /// Same as [FileGroupReader::read_file_slice_by_base_file_path], but
blocking.
- pub fn read_file_slice_by_base_file_path_blocking(
- &self,
- relative_path: &str,
- ) -> Result<RecordBatch> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(self.read_file_slice_by_base_file_path(relative_path))
- }
-
fn create_instant_range_for_log_file_scan(&self) -> InstantRange {
let timezone = self
.hudi_configs
@@ -179,14 +162,6 @@ impl FileGroupReader {
.await
}
- /// Same as [FileGroupReader::read_file_slice], but blocking.
- pub fn read_file_slice_blocking(&self, file_slice: &FileSlice) ->
Result<RecordBatch> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(self.read_file_slice(file_slice))
- }
-
/// Reads a file slice from a base file and a list of log files.
///
/// # Arguments
@@ -248,22 +223,6 @@ impl FileGroupReader {
}
}
- /// Same as [FileGroupReader::read_file_slice_from_paths], but blocking.
- pub fn read_file_slice_from_paths_blocking<I, S>(
- &self,
- base_file_path: &str,
- log_file_paths: I,
- ) -> Result<RecordBatch>
- where
- I: IntoIterator<Item = S>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(self.read_file_slice_from_paths(base_file_path,
log_file_paths))
- }
-
//
=========================================================================
// Streaming Read APIs
//
=========================================================================
@@ -694,11 +653,13 @@ mod tests {
url.as_ref().to_string()
}
- #[test]
- fn test_new_with_options() {
+ #[tokio::test]
+ async fn test_new_with_options() {
let options = vec![("key1", "value1"), ("key2", "value2")];
let base_uri = get_base_uri_with_valid_props();
- let reader = FileGroupReader::new_with_options(&base_uri,
options).unwrap();
+ let reader = FileGroupReader::new_with_options(&base_uri, options)
+ .await
+ .unwrap();
assert!(!reader.storage.options.is_empty());
assert!(
reader
@@ -708,14 +669,14 @@ mod tests {
);
}
- #[test]
- fn test_new_with_options_invalid_base_uri_or_invalid_props() {
+ #[tokio::test]
+ async fn test_new_with_options_invalid_base_uri_or_invalid_props() {
let base_uri = get_non_existent_base_uri();
- let result = FileGroupReader::new_with_options(&base_uri,
empty_options());
+ let result = FileGroupReader::new_with_options(&base_uri,
empty_options()).await;
assert!(result.is_err());
let base_uri = get_base_uri_with_invalid_props();
- let result = FileGroupReader::new_with_options(&base_uri,
empty_options());
+ let result = FileGroupReader::new_with_options(&base_uri,
empty_options()).await;
assert!(result.is_err())
}
@@ -736,8 +697,8 @@ mod tests {
RecordBatch::try_new(schema, vec![commit_times, names,
ages]).map_err(CoreError::ArrowError)
}
- #[test]
- fn test_create_commit_time_filter_mask() -> Result<()> {
+ #[tokio::test]
+ async fn test_create_commit_time_filter_mask() -> Result<()> {
let base_uri = get_base_uri_with_valid_props_minimum();
let records = create_test_record_batch()?;
@@ -748,12 +709,13 @@ mod tests {
(HudiTableConfig::PopulatesMetaFields.as_ref(), "false"),
(HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
],
- )?;
+ )
+ .await?;
let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(mask, None, "Commit time filtering should not be needed");
// Test case 2: No commit time filtering options
- let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options()).await?;
let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(mask, None);
@@ -761,7 +723,8 @@ mod tests {
let reader = FileGroupReader::new_with_options(
&base_uri,
[(HudiReadConfig::FileGroupStartTimestamp, "2")],
- )?;
+ )
+ .await?;
let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(
mask,
@@ -773,7 +736,8 @@ mod tests {
let reader = FileGroupReader::new_with_options(
&base_uri,
[(HudiReadConfig::FileGroupEndTimestamp, "4")],
- )?;
+ )
+ .await?;
let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(mask, None, "Commit time filtering should not be needed");
@@ -784,7 +748,8 @@ mod tests {
(HudiReadConfig::FileGroupStartTimestamp, "2"),
(HudiReadConfig::FileGroupEndTimestamp, "4"),
],
- )?;
+ )
+ .await?;
let mask = create_commit_time_filter_mask(&reader.hudi_configs,
&records)?;
assert_eq!(
mask,
@@ -795,16 +760,18 @@ mod tests {
Ok(())
}
- #[test]
- fn test_read_file_slice_from_paths_with_base_file_only() -> Result<()> {
+ #[tokio::test]
+ async fn test_read_file_slice_from_paths_with_base_file_only() ->
Result<()> {
let base_uri = get_base_uri_with_valid_props_minimum();
- let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options()).await?;
// Test with actual test files and empty log files - should trigger
base_file_only logic
let base_file_path = TEST_SAMPLE_BASE_FILE;
let log_file_paths: Vec<&str> = vec![];
- let result =
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+ let result = reader
+ .read_file_slice_from_paths(base_file_path, log_file_paths)
+ .await;
match result {
Ok(batch) => {
@@ -821,18 +788,21 @@ mod tests {
Ok(())
}
- #[test]
- fn test_read_file_slice_from_paths_read_optimized_mode() -> Result<()> {
+ #[tokio::test]
+ async fn test_read_file_slice_from_paths_read_optimized_mode() ->
Result<()> {
let base_uri = get_base_uri_with_valid_props_minimum();
let reader = FileGroupReader::new_with_options(
&base_uri,
[(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
- )?;
+ )
+ .await?;
let base_file_path = TEST_SAMPLE_BASE_FILE;
let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
- let result =
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+ let result = reader
+ .read_file_slice_from_paths(base_file_path, log_file_paths)
+ .await;
// In read-optimized mode, log files should be ignored
// This should behave the same as read_file_slice_by_base_file_path
@@ -853,15 +823,17 @@ mod tests {
Ok(())
}
- #[test]
- fn test_read_file_slice_from_paths_with_log_files() -> Result<()> {
+ #[tokio::test]
+ async fn test_read_file_slice_from_paths_with_log_files() -> Result<()> {
let base_uri = get_base_uri_with_valid_props_minimum();
- let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options()).await?;
let base_file_path = TEST_SAMPLE_BASE_FILE;
let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
- let result =
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+ let result = reader
+ .read_file_slice_from_paths(base_file_path, log_file_paths)
+ .await;
// The actual file reading might fail due to missing test data, which
is expected
match result {
@@ -881,16 +853,18 @@ mod tests {
Ok(())
}
- #[test]
- fn test_read_file_slice_from_paths_error_handling() -> Result<()> {
+ #[tokio::test]
+ async fn test_read_file_slice_from_paths_error_handling() -> Result<()> {
let base_uri = get_base_uri_with_valid_props_minimum();
- let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options()).await?;
// Test with non-existent base file
let base_file_path = "non_existent_file.parquet";
let log_file_paths: Vec<&str> = vec![];
- let result =
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+ let result = reader
+ .read_file_slice_from_paths(base_file_path, log_file_paths)
+ .await;
assert!(result.is_err(), "Should return error for non-existent file");
@@ -903,17 +877,17 @@ mod tests {
Ok(())
}
- #[test]
- fn test_read_file_slice_blocking() -> Result<()> {
+ #[tokio::test]
+ async fn test_read_file_slice() -> Result<()> {
let base_uri = get_base_uri_with_valid_props_minimum();
- let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options()).await?;
// Create a FileSlice from the test sample base file
let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
let file_slice = FileSlice::new(base_file, String::new()); // empty
partition path
- // Call read_file_slice_blocking
- let result = reader.read_file_slice_blocking(&file_slice);
+ // Call read_file_slice
+ let result = reader.read_file_slice(&file_slice).await;
match result {
Ok(batch) => {
@@ -1279,11 +1253,11 @@ mod tests {
FileGroupReader::new_with_configs_and_overwriting_options(hudi_configs,
empty_options())
}
- #[test]
- fn test_is_metadata_table_detection() -> Result<()> {
+ #[tokio::test]
+ async fn test_is_metadata_table_detection() -> Result<()> {
// Regular table should return false
let base_uri = get_base_uri_with_valid_props();
- let reader = FileGroupReader::new_with_options(&base_uri,
empty_options())?;
+ let reader = FileGroupReader::new_with_options(&base_uri,
empty_options()).await?;
assert!(!reader.is_metadata_table());
// Metadata table should return true
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 24deae4..fbbe9d9 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -26,8 +26,11 @@
//! use hudi_core::config::read::HudiReadConfig::InputPartitions;
//! use hudi_core::table::Table as HudiTable;
//!
+//! # #[tokio::main]
+//! # async fn main() {
//! let options = [(InputPartitions, "2")];
-//! HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
+//! HudiTable::new_with_options("/tmp/hudi_data", options).await;
+//! # }
//! ```
//!
//! # The [table] module is responsible for managing Hudi tables.
diff --git a/crates/core/src/metadata/table/mod.rs
b/crates/core/src/metadata/table/mod.rs
index e5e211c..7a2d3e6 100644
--- a/crates/core/src/metadata/table/mod.rs
+++ b/crates/core/src/metadata/table/mod.rs
@@ -140,14 +140,6 @@ impl Table {
.await
}
- /// Same as [Table::new_metadata_table], but blocking.
- pub fn new_metadata_table_blocking(&self) -> Result<Table> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.new_metadata_table().await })
- }
-
/// Fetch records from the `files` partition of metadata table
/// with optional data table partition pruning.
///
@@ -167,17 +159,6 @@ impl Table {
.await
}
- /// Same as [Table::read_metadata_table_files_partition], but blocking.
- pub fn read_metadata_table_files_partition_blocking(
- &self,
- partition_pruner: &PartitionPruner,
- ) -> Result<HashMap<String, FilesPartitionRecord>> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
-
.block_on(self.read_metadata_table_files_partition(partition_pruner))
- }
-
/// Fetch records from the `files` partition with optional partition
pruning.
///
/// For non-partitioned tables, directly fetches the "." record.
@@ -297,20 +278,21 @@ mod tests {
use records::{FilesPartitionRecord, MetadataRecordType};
use std::collections::HashSet;
- fn get_data_table() -> Table {
+ async fn get_data_table() -> Table {
let table_path =
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
- Table::new_blocking(&table_path).unwrap()
+ Table::new(&table_path).await.unwrap()
}
- #[test]
- fn hudi_table_read_metadata_table_files_partition() {
- let data_table = get_data_table();
- let partition_schema =
data_table.get_partition_schema_blocking().unwrap();
+ #[tokio::test]
+ async fn hudi_table_read_metadata_table_files_partition() {
+ let data_table = get_data_table().await;
+ let partition_schema =
data_table.get_partition_schema().await.unwrap();
let partition_pruner =
PartitionPruner::new(&[], &partition_schema,
data_table.hudi_configs.as_ref()).unwrap();
let records = data_table
- .read_metadata_table_files_partition_blocking(&partition_pruner)
+ .read_metadata_table_files_partition(&partition_pruner)
+ .await
.unwrap();
// Should have 4 records: __all_partitions__ + 3 city partitions
@@ -346,9 +328,9 @@ mod tests {
assert!(chennai.total_size() > 0);
}
- #[test]
- fn hudi_table_get_metadata_table_partitions() {
- let data_table = get_data_table();
+ #[tokio::test]
+ async fn hudi_table_get_metadata_table_partitions() {
+ let data_table = get_data_table().await;
// Verify we can get the metadata table partitions from the data table
let partitions = data_table.get_metadata_table_partitions();
@@ -376,11 +358,11 @@ mod tests {
}
}
- #[test]
- fn hudi_table_is_metadata_table_enabled() {
+ #[tokio::test]
+ async fn hudi_table_is_metadata_table_enabled() {
// V8 table with files partition configured should enable metadata
table
// even without explicit hoodie.metadata.enable=true
- let data_table = get_data_table();
+ let data_table = get_data_table().await;
// Verify it's a v8 table
let table_version: isize = data_table
@@ -404,11 +386,11 @@ mod tests {
);
}
- #[test]
- fn hudi_table_v6_metadata_table_not_enabled() {
+ #[tokio::test]
+ async fn hudi_table_v6_metadata_table_not_enabled() {
// V6 tables should NOT have metadata table enabled, even with
explicit setting
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
// Verify it's a v6 table
let table_version: isize = hudi_table
@@ -425,35 +407,35 @@ mod tests {
);
}
- #[test]
- fn hudi_table_is_not_metadata_table() {
+ #[tokio::test]
+ async fn hudi_table_is_not_metadata_table() {
// A regular data table should not be a metadata table
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
assert!(
!hudi_table.is_metadata_table(),
"Regular data table should not be a metadata table"
);
}
- #[test]
- fn hudi_table_metadata_table_is_metadata_table() {
+ #[tokio::test]
+ async fn hudi_table_metadata_table_is_metadata_table() {
// Create a metadata table and verify it's recognized as such
- let data_table = get_data_table();
- let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+ let data_table = get_data_table().await;
+ let metadata_table = data_table.new_metadata_table().await.unwrap();
assert!(
metadata_table.is_metadata_table(),
"Metadata table should be recognized as a metadata table"
);
}
- #[test]
- fn hudi_table_new_metadata_table_from_metadata_table_errors() {
+ #[tokio::test]
+ async fn hudi_table_new_metadata_table_from_metadata_table_errors() {
// Trying to create a metadata table from a metadata table should fail
- let data_table = get_data_table();
- let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+ let data_table = get_data_table().await;
+ let metadata_table = data_table.new_metadata_table().await.unwrap();
- let result = metadata_table.new_metadata_table_blocking();
+ let result = metadata_table.new_metadata_table().await;
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs
index 52ce19a..e351581 100644
--- a/crates/core/src/schema/mod.rs
+++ b/crates/core/src/schema/mod.rs
@@ -18,7 +18,9 @@
*/
use crate::error::{CoreError, Result};
use crate::metadata::meta_field::MetaField;
+use crate::schema::resolver::sanitize_avro_schema_str;
use arrow_schema::{Schema, SchemaRef};
+use serde_json::Value;
pub mod delete;
pub mod resolver;
@@ -36,6 +38,48 @@ pub fn prepend_meta_fields_with_operation(schema: SchemaRef)
-> Result<Schema> {
.map_err(CoreError::ArrowError)
}
+pub fn prepend_meta_fields_to_avro_schema_str(avro_schema_str: &str) ->
Result<String> {
+ let mut schema: Value =
serde_json::from_str(&sanitize_avro_schema_str(avro_schema_str))
+ .map_err(|e| CoreError::Schema(format!("Failed to parse Avro schema
JSON: {e}")))?;
+
+ let fields = schema
+ .get_mut("fields")
+ .and_then(|f| f.as_array_mut())
+ .ok_or_else(|| CoreError::Schema("Avro schema has no 'fields'
array".to_string()))?;
+
+ let meta_field_defs: Vec<Value> = MetaField::field_names()
+ .iter()
+ .map(|name| {
+ serde_json::json!({
+ "name": name,
+ "type": ["null", "string"],
+ "default": null
+ })
+ })
+ .collect();
+
+ let existing_names: std::collections::HashSet<&str> = fields
+ .iter()
+ .filter_map(|f| f.get("name").and_then(|n| n.as_str()))
+ .collect();
+
+ let new_meta_fields: Vec<Value> = meta_field_defs
+ .into_iter()
+ .filter(|f| {
+ f.get("name")
+ .and_then(|n| n.as_str())
+ .is_none_or(|name| !existing_names.contains(name))
+ })
+ .collect();
+
+ let mut all_fields = new_meta_fields;
+ all_fields.append(fields);
+ *fields = all_fields;
+
+ serde_json::to_string(&schema)
+ .map_err(|e| CoreError::Schema(format!("Failed to serialize Avro
schema: {e}")))
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -62,4 +106,35 @@ mod tests {
[MetaField::field_names_with_operation(), vec!["field1"]].concat()
)
}
+
+ #[test]
+ fn test_prepend_meta_fields_to_avro_schema_str() {
+ let avro_schema =
+
r#"{"type":"record","name":"TestRecord","fields":[{"name":"id","type":"int"}]}"#;
+ let result =
prepend_meta_fields_to_avro_schema_str(avro_schema).unwrap();
+ let parsed: Value = serde_json::from_str(&result).unwrap();
+ let fields = parsed["fields"].as_array().unwrap();
+ assert_eq!(fields.len(), 6, "Expected 5 meta fields + 1 data field");
+ assert_eq!(fields[0]["name"], "_hoodie_commit_time");
+ assert_eq!(fields[5]["name"], "id");
+ }
+
+ #[test]
+ fn test_prepend_meta_fields_to_avro_schema_str_dedup() {
+ let avro_schema =
r#"{"type":"record","name":"TestRecord","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"default":null},{"name":"id","type":"int"}]}"#;
+ let result =
prepend_meta_fields_to_avro_schema_str(avro_schema).unwrap();
+ let parsed: Value = serde_json::from_str(&result).unwrap();
+ let fields = parsed["fields"].as_array().unwrap();
+ assert_eq!(
+ fields.len(),
+ 6,
+ "Expected 5 meta fields + 1 data field (deduped existing meta
field)"
+ );
+ // The existing _hoodie_commit_time should not be duplicated
+ let commit_time_count = fields
+ .iter()
+ .filter(|f| f["name"] == "_hoodie_commit_time")
+ .count();
+ assert_eq!(commit_time_count, 1);
+ }
}
diff --git a/crates/core/src/schema/resolver.rs
b/crates/core/src/schema/resolver.rs
index b1fcd2e..306acec 100644
--- a/crates/core/src/schema/resolver.rs
+++ b/crates/core/src/schema/resolver.rs
@@ -20,7 +20,7 @@ use crate::avro_to_arrow::to_arrow_schema;
use crate::config::table::HudiTableConfig;
use crate::error::{CoreError, Result};
use crate::metadata::commit::HoodieCommitMetadata;
-use crate::schema::prepend_meta_fields;
+use crate::schema::{prepend_meta_fields,
prepend_meta_fields_to_avro_schema_str};
use crate::storage::Storage;
use crate::table::Table;
use apache_avro::schema::Schema as AvroSchema;
@@ -30,62 +30,72 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
-/// Resolves the [`arrow_schema::Schema`] for a given Hudi table.
+/// Resolves the data [`arrow_schema::Schema`] for a given Hudi table, without
Hudi meta fields.
///
/// The resolution process follows these steps:
-/// - If the timeline has commit metadata, read the schema field from it.
-/// - If the commit metadata has no schema, read the schema from the base
file pointed by the first entry in the write-status of the commit metadata.
-/// - If the timeline has no commit metadata, read
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
-pub async fn resolve_schema(table: &Table) -> Result<Schema> {
- let timeline = table.get_timeline();
- match timeline.get_latest_commit_metadata().await {
- Ok(metadata) => {
- resolve_schema_from_commit_metadata(&metadata,
timeline.storage.clone()).await
- }
- Err(CoreError::TimelineNoCommit) => {
- if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
- let avro_schema_str: String = create_schema.into();
- let arrow_schema =
arrow_schema_from_avro_schema_str(&avro_schema_str)?;
- prepend_meta_fields(SchemaRef::new(arrow_schema))
- } else {
- Err(CoreError::SchemaNotFound(
- "No completed commit, and no create schema for the
table.".to_string(),
- ))
- }
- }
+/// 1. Try to get the schema from the timeline (commit metadata or base file).
+/// 2. Fall back to [`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+pub async fn resolve_data_schema(table: &Table) -> Result<Schema> {
+ match table.get_timeline().get_latest_schema().await {
+ Ok(schema) => Ok(schema),
+ Err(CoreError::TimelineNoCommit) =>
resolve_data_schema_from_create_schema(table),
Err(e) => Err(e),
}
}
+/// Resolves the [`arrow_schema::Schema`] for a given Hudi table, with Hudi
meta fields prepended.
+pub async fn resolve_schema(table: &Table) -> Result<Schema> {
+ let data_schema = resolve_data_schema(table).await?;
+ prepend_meta_fields(SchemaRef::new(data_schema))
+}
+
/// Resolves the [`apache_avro::schema::Schema`] as a [`String`] for a given
Hudi table.
///
/// The resolution process follows these steps:
-/// - If the timeline has commit metadata, read the schema field from it.
-/// - If the timeline has no commit metadata, read
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+/// 1. Try to get the Avro schema from the timeline (commit metadata).
+/// 2. Fall back to [`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
///
/// ### Note
///
/// - For resolving Avro schema, we don't read the schema from a base file
like we do when resolving Arrow schema.
/// - Avro schema does not contain [`MetaField`]s.
pub async fn resolve_avro_schema(table: &Table) -> Result<String> {
- let timeline = table.get_timeline();
- match timeline.get_latest_commit_metadata().await {
- Ok(metadata) => resolve_avro_schema_from_commit_metadata(&metadata),
- Err(CoreError::TimelineNoCommit) => {
- if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
- let create_schema: String = create_schema.into();
- Ok(sanitize_avro_schema_str(&create_schema))
- } else {
- Err(CoreError::SchemaNotFound(
- "No completed commit, and no create schema for the
table.".to_string(),
- ))
- }
- }
+ match table.get_timeline().get_latest_avro_schema().await {
+ Ok(schema) => Ok(schema),
+ Err(CoreError::TimelineNoCommit) =>
resolve_avro_schema_from_create_schema(table),
Err(e) => Err(e),
}
}
-pub(crate) async fn resolve_schema_from_commit_metadata(
+/// Same as [`resolve_avro_schema`] but with Hudi meta fields prepended to the
schema.
+pub async fn resolve_avro_schema_with_meta_fields(table: &Table) ->
Result<String> {
+ let avro_schema_str = resolve_avro_schema(table).await?;
+ prepend_meta_fields_to_avro_schema_str(&avro_schema_str)
+}
+
+fn resolve_data_schema_from_create_schema(table: &Table) -> Result<Schema> {
+ if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+ let avro_schema_str: String = create_schema.into();
+ arrow_schema_from_avro_schema_str(&avro_schema_str)
+ } else {
+ Err(CoreError::SchemaNotFound(
+ "No completed commit, and no create schema for the
table.".to_string(),
+ ))
+ }
+}
+
+fn resolve_avro_schema_from_create_schema(table: &Table) -> Result<String> {
+ if let Some(create_schema) =
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+ let create_schema: String = create_schema.into();
+ Ok(sanitize_avro_schema_str(&create_schema))
+ } else {
+ Err(CoreError::SchemaNotFound(
+ "No completed commit, and no create schema for the
table.".to_string(),
+ ))
+ }
+}
+
+pub(crate) async fn resolve_data_schema_from_commit_metadata(
commit_metadata: &Map<String, Value>,
storage: Arc<Storage>,
) -> Result<Schema> {
@@ -97,8 +107,7 @@ pub(crate) async fn resolve_schema_from_commit_metadata(
Err(e) => return Err(e),
};
- let arrow_schema = arrow_schema_from_avro_schema_str(&avro_schema_str)?;
- prepend_meta_fields(SchemaRef::new(arrow_schema))
+ arrow_schema_from_avro_schema_str(&avro_schema_str)
}
pub(crate) fn resolve_avro_schema_from_commit_metadata(
@@ -158,7 +167,7 @@ async fn resolve_schema_from_base_file(
))
}
-fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
+pub(crate) fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
avro_schema_str.trim().replace("\\:", ":")
}
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index c151a63..e861ed1 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -106,7 +106,9 @@ use crate::expr::filter::{Filter, from_str_tuples};
use crate::file_group::file_slice::FileSlice;
use crate::file_group::reader::FileGroupReader;
use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
-use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
+use crate::schema::resolver::{
+ resolve_avro_schema, resolve_avro_schema_with_meta_fields,
resolve_data_schema, resolve_schema,
+};
use crate::table::builder::TableBuilder;
use crate::table::file_pruner::FilePruner;
use crate::table::fs_view::FileSystemView;
@@ -135,14 +137,6 @@ impl Table {
TableBuilder::from_base_uri(base_uri).build().await
}
- /// Same as [Table::new], but blocking.
- pub fn new_blocking(base_uri: &str) -> Result<Self> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { Table::new(base_uri).await })
- }
-
/// Create hudi table with options
pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
where
@@ -156,19 +150,6 @@ impl Table {
.await
}
- /// Same as [Table::new_with_options], but blocking.
- pub fn new_with_options_blocking<I, K, V>(base_uri: &str, options: I) ->
Result<Self>
- where
- I: IntoIterator<Item = (K, V)>,
- K: AsRef<str>,
- V: Into<String>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { Table::new_with_options(base_uri, options).await
})
- }
-
pub fn hudi_options(&self) -> HashMap<String, String> {
self.hudi_configs.as_options()
}
@@ -225,44 +206,52 @@ impl Table {
.into()
}
- /// Get the latest Avro schema string of the table.
+ /// Get the latest Avro schema string of the table, without Hudi meta
fields (`_hoodie_*`).
///
/// The implementation looks for the schema in the following order:
/// 1. Timeline commit metadata.
/// 2. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
- ///
- /// ### Note
- ///
- /// The schema returned does not contain Hudi's [MetaField]s,
- /// which is different from the one returned by [Table::get_schema].
- pub async fn get_avro_schema(&self) -> Result<String> {
- resolve_avro_schema(self).await
+ pub async fn get_schema_in_avro_str(&self) -> Result<String> {
+ self.get_schema_in_avro_str_internal(false).await
+ }
+
+ /// Get the latest Avro schema string of the table, with Hudi meta fields
(`_hoodie_*`)
+ /// prepended.
+ pub async fn get_schema_in_avro_str_with_meta_fields(&self) ->
Result<String> {
+ self.get_schema_in_avro_str_internal(true).await
}
- /// Same as [Table::get_avro_schema], but blocking.
- pub fn get_avro_schema_blocking(&self) -> Result<String> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.get_avro_schema().await })
+ async fn get_schema_in_avro_str_internal(&self, includes_meta_fields:
bool) -> Result<String> {
+ if includes_meta_fields {
+ resolve_avro_schema_with_meta_fields(self).await
+ } else {
+ resolve_avro_schema(self).await
+ }
}
- /// Get the latest [arrow_schema::Schema] of the table.
+ /// Get the latest [arrow_schema::Schema] of the table, without Hudi meta
fields
+ /// (`_hoodie_*`).
///
/// The implementation looks for the schema in the following order:
/// 1. Timeline commit metadata.
/// 2. Base file schema.
/// 3. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
pub async fn get_schema(&self) -> Result<Schema> {
- resolve_schema(self).await
+ self.get_schema_internal(false).await
+ }
+
+ /// Get the latest [arrow_schema::Schema] of the table, with Hudi meta
fields (`_hoodie_*`)
+ /// prepended.
+ pub async fn get_schema_with_meta_fields(&self) -> Result<Schema> {
+ self.get_schema_internal(true).await
}
- /// Same as [Table::get_schema], but blocking.
- pub fn get_schema_blocking(&self) -> Result<Schema> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.get_schema().await })
+ async fn get_schema_internal(&self, includes_meta_fields: bool) ->
Result<Schema> {
+ if includes_meta_fields {
+ resolve_schema(self).await
+ } else {
+ resolve_data_schema(self).await
+ }
}
/// Get the latest partition [arrow_schema::Schema] of the table.
@@ -298,14 +287,6 @@ impl Table {
Ok(Schema::new(partition_fields))
}
- /// Same as [Table::get_partition_schema], but blocking.
- pub fn get_partition_schema_blocking(&self) -> Result<Schema> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.get_partition_schema().await })
- }
-
/// Get the [Timeline] of the table.
pub fn get_timeline(&self) -> &Timeline {
&self.timeline
@@ -314,8 +295,8 @@ impl Table {
/// Get all the [FileSlice]s in splits from the table.
///
/// # Arguments
- /// * `num_splits` - The number of chunks to split the file slices
into.
- /// * `filters` - Partition filters to apply.
+ /// * `num_splits` - The number of chunks to split the file slices into.
+ /// * `filters` - Partition filters to apply.
pub async fn get_file_slices_splits<I, S>(
&self,
num_splits: usize,
@@ -334,28 +315,12 @@ impl Table {
}
}
- /// Same as [Table::get_file_slices_splits], but blocking.
- pub fn get_file_slices_splits_blocking<I, S>(
- &self,
- num_splits: usize,
- filters: I,
- ) -> Result<Vec<Vec<FileSlice>>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.get_file_slices_splits(num_splits,
filters).await })
- }
-
/// Get all the [FileSlice]s in splits from the table at a given timestamp.
///
/// # Arguments
- /// * `num_splits` - The number of chunks to split the file slices
into.
- /// * `timestamp` - The timestamp which file slices associated with.
- /// * `filters` - Partition filters to apply.
+ /// * `num_splits` - The number of chunks to split the file slices into.
+ /// * `timestamp` - The timestamp which file slices associated with.
+ /// * `filters` - Partition filters to apply.
pub async fn get_file_slices_splits_as_of<I, S>(
&self,
num_splits: usize,
@@ -372,26 +337,6 @@ impl Table {
.await
}
- /// Same as [Table::get_file_slices_splits_as_of], but blocking.
- pub fn get_file_slices_splits_as_of_blocking<I, S>(
- &self,
- num_splits: usize,
- timestamp: &str,
- filters: I,
- ) -> Result<Vec<Vec<FileSlice>>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async {
- self.get_file_slices_splits_as_of(num_splits, timestamp,
filters)
- .await
- })
- }
-
async fn get_file_slices_splits_internal(
&self,
num_splits: usize,
@@ -405,10 +350,10 @@ impl Table {
/// Get all the [FileSlice]s in the table.
///
/// # Arguments
- /// * `filters` - Partition filters to apply.
+ /// * `filters` - Partition filters to apply.
///
/// # Notes
- /// * This API is useful for implementing snapshot query.
+ /// * This API is useful for implementing snapshot query.
pub async fn get_file_slices<I, S>(&self, filters: I) ->
Result<Vec<FileSlice>>
where
I: IntoIterator<Item = (S, S, S)>,
@@ -422,26 +367,14 @@ impl Table {
}
}
- /// Same as [Table::get_file_slices], but blocking.
- pub fn get_file_slices_blocking<I, S>(&self, filters: I) ->
Result<Vec<FileSlice>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.get_file_slices(filters).await })
- }
-
/// Get all the [FileSlice]s in the table at a given timestamp.
///
/// # Arguments
- /// * `timestamp` - The timestamp which file slices associated with.
- /// * `filters` - Partition filters to apply.
+ /// * `timestamp` - The timestamp which file slices associated with.
+ /// * `filters` - Partition filters to apply.
///
/// # Notes
- /// * This API is useful for implementing time travel query.
+ /// * This API is useful for implementing time travel query.
pub async fn get_file_slices_as_of<I, S>(
&self,
timestamp: &str,
@@ -456,22 +389,6 @@ impl Table {
self.get_file_slices_internal(×tamp, &filters).await
}
- /// Same as [Table::get_file_slices_as_of], but blocking.
- pub fn get_file_slices_as_of_blocking<I, S>(
- &self,
- timestamp: &str,
- filters: I,
- ) -> Result<Vec<FileSlice>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.get_file_slices_as_of(timestamp,
filters).await })
- }
-
async fn get_file_slices_internal(
&self,
timestamp: &str,
@@ -519,11 +436,11 @@ impl Table {
/// Get all the changed [FileSlice]s in the table between the given
timestamps.
///
/// # Arguments
- /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
- /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
+ /// * `start_timestamp` - If provided, only file slices that were changed
after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were changed
before or at this timestamp will be returned.
///
/// # Notes
- /// * This API is useful for implementing incremental query.
+ /// * This API is useful for implementing incremental query.
pub async fn get_file_slices_between(
&self,
start_timestamp: Option<&str>,
@@ -543,31 +460,16 @@ impl Table {
self.get_file_slices_between_internal(start, end).await
}
- /// Same as [Table::get_file_slices_between], but blocking.
- pub fn get_file_slices_between_blocking(
- &self,
- start_timestamp: Option<&str>,
- end_timestamp: Option<&str>,
- ) -> Result<Vec<FileSlice>> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async {
- self.get_file_slices_between(start_timestamp, end_timestamp)
- .await
- })
- }
-
/// Get all the changed [FileSlice]s in splits from the table between the
given timestamps.
///
/// # Arguments
- /// * `num_splits` - The number of chunks to split the file slices
into.
- /// * `start_timestamp` - If provided, only file slices that were
changed after this timestamp will be returned.
- /// * `end_timestamp` - If provided, only file slices that were
changed before or at this timestamp will be returned.
+ /// * `num_splits` - The number of chunks to split the file slices into.
+ /// * `start_timestamp` - If provided, only file slices that were changed
after this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only file slices that were changed
before or at this timestamp will be returned.
///
/// # Notes
- /// * This API is useful for implementing incremental query with read
parallelism.
- /// * Uses the same splitting flow as the time-travel API to respect
read parallelism config.
+ /// * This API is useful for implementing incremental query with read
parallelism.
+ /// * Uses the same splitting flow as the time-travel API to respect read
parallelism config.
pub async fn get_file_slices_splits_between(
&self,
num_splits: usize,
@@ -589,22 +491,6 @@ impl Table {
.await
}
- /// Same as [Table::get_file_slices_splits_between], but blocking.
- pub fn get_file_slices_splits_between_blocking(
- &self,
- num_splits: usize,
- start_timestamp: Option<&str>,
- end_timestamp: Option<&str>,
- ) -> Result<Vec<Vec<FileSlice>>> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async {
- self.get_file_slices_splits_between(num_splits,
start_timestamp, end_timestamp)
- .await
- })
- }
-
async fn get_file_slices_splits_between_internal(
&self,
num_splits: usize,
@@ -662,7 +548,7 @@ impl Table {
/// Get all the latest records in the table.
///
/// # Arguments
- /// * `filters` - Partition filters to apply.
+ /// * `filters` - Partition filters to apply.
pub async fn read_snapshot<I, S>(&self, filters: I) ->
Result<Vec<RecordBatch>>
where
I: IntoIterator<Item = (S, S, S)>,
@@ -676,23 +562,11 @@ impl Table {
}
}
- /// Same as [Table::read_snapshot], but blocking.
- pub fn read_snapshot_blocking<I, S>(&self, filters: I) ->
Result<Vec<RecordBatch>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.read_snapshot(filters).await })
- }
-
/// Get all the records in the table at a given timestamp.
///
/// # Arguments
- /// * `timestamp` - The timestamp which records associated with.
- /// * `filters` - Partition filters to apply.
+ /// * `timestamp` - The timestamp which records associated with.
+ /// * `filters` - Partition filters to apply.
pub async fn read_snapshot_as_of<I, S>(
&self,
timestamp: &str,
@@ -707,22 +581,6 @@ impl Table {
self.read_snapshot_internal(×tamp, &filters).await
}
- /// Same as [Table::read_snapshot_as_of], but blocking.
- pub fn read_snapshot_as_of_blocking<I, S>(
- &self,
- timestamp: &str,
- filters: I,
- ) -> Result<Vec<RecordBatch>>
- where
- I: IntoIterator<Item = (S, S, S)>,
- S: AsRef<str>,
- {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async { self.read_snapshot_as_of(timestamp,
filters).await })
- }
-
async fn read_snapshot_internal(
&self,
timestamp: &str,
@@ -745,8 +603,8 @@ impl Table {
/// the time span being returned.
///
/// # Arguments
- /// * `start_timestamp` - Only records that were inserted or updated
after this timestamp will be returned.
- /// * `end_timestamp` - If provided, only records that were inserted
or updated before or at this timestamp will be returned.
+ /// * `start_timestamp` - Only records that were inserted or updated after
this timestamp will be returned.
+ /// * `end_timestamp` - If provided, only records that were inserted or
updated before or at this timestamp will be returned.
pub async fn read_incremental_records(
&self,
start_timestamp: &str,
@@ -780,21 +638,6 @@ impl Table {
Ok(batches)
}
- /// Same as [Table::read_incremental_records], but blocking.
- pub fn read_incremental_records_blocking(
- &self,
- start_timestamp: &str,
- end_timestamp: Option<&str>,
- ) -> Result<Vec<RecordBatch>> {
- tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?
- .block_on(async {
- self.read_incremental_records(start_timestamp, end_timestamp)
- .await
- })
- }
-
/// Get the change-data-capture (CDC) records between the given timestamps.
///
/// The CDC records should reflect the records that were inserted,
updated, and deleted
@@ -987,26 +830,27 @@ mod tests {
/// # Arguments
///
/// * `table_dir_name` - Name of the table root directory; all under
`crates/core/tests/data/`.
- fn get_test_table_without_validation(table_dir_name: &str) -> Table {
+ async fn get_test_table_without_validation(table_dir_name: &str) -> Table {
let base_url = Url::from_file_path(
canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(),
)
.unwrap();
- Table::new_with_options_blocking(
+ Table::new_with_options(
base_url.as_str(),
[("hoodie.internal.skip.config.validation", "true")],
)
+ .await
.unwrap()
}
/// Test helper to get relative file paths from the table with filters.
- fn get_file_paths_with_filters(
+ async fn get_file_paths_with_filters(
table: &Table,
filters: &[(&str, &str, &str)],
) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
let base_url = table.base_url();
- for f in table.get_file_slices_blocking(filters.to_vec())? {
+ for f in table.get_file_slices(filters.to_vec()).await? {
let relative_path = f.base_file_relative_path()?;
let file_url = join_url_segments(&base_url,
&[relative_path.as_str()])?;
file_paths.push(file_url.to_string());
@@ -1014,10 +858,10 @@ mod tests {
Ok(file_paths)
}
- #[test]
- fn test_hudi_table_get_hudi_options() {
+ #[tokio::test]
+ async fn test_hudi_table_get_hudi_options() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let hudi_options = hudi_table.hudi_options();
for (k, v) in hudi_options.iter() {
assert!(k.starts_with("hoodie."));
@@ -1025,10 +869,10 @@ mod tests {
}
}
- #[test]
- fn test_hudi_table_get_storage_options() {
+ #[tokio::test]
+ async fn test_hudi_table_get_storage_options() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES
.iter()
@@ -1050,27 +894,33 @@ mod tests {
}
}
- #[test]
+ #[tokio::test]
#[serial(env_vars)]
- fn hudi_table_get_schema_from_empty_table_without_create_schema() {
- let table =
get_test_table_without_validation("table_props_no_create_schema");
+ async fn hudi_table_get_schema_from_empty_table_without_create_schema() {
+ let table =
get_test_table_without_validation("table_props_no_create_schema").await;
- let schema = table.get_schema_blocking();
+ let schema = table.get_schema().await;
assert!(schema.is_err());
assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
- let schema = table.get_avro_schema_blocking();
+ let schema = table.get_schema_in_avro_str().await;
assert!(schema.is_err());
assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
}
- #[test]
- fn
hudi_table_get_schema_from_empty_table_resolves_to_table_create_schema() {
+ #[tokio::test]
+ async fn
hudi_table_get_schema_from_empty_table_resolves_to_table_create_schema() {
for base_url in SampleTable::V6Empty.urls() {
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+
+ // Validate the Arrow schema without meta fields
+ let schema = hudi_table.get_schema().await;
+ assert!(schema.is_ok());
+ let schema = schema.unwrap();
+ assert_arrow_field_names_eq!(schema, ["id", "name", "isActive"]);
- // Validate the Arrow schema
- let schema = hudi_table.get_schema_blocking();
+ // Validate the Arrow schema with meta fields
+ let schema = hudi_table.get_schema_with_meta_fields().await;
assert!(schema.is_ok());
let schema = schema.unwrap();
assert_arrow_field_names_eq!(
@@ -1078,18 +928,31 @@ mod tests {
[MetaField::field_names(), vec!["id", "name",
"isActive"]].concat()
);
- // Validate the Avro schema
- let avro_schema = hudi_table.get_avro_schema_blocking();
+ // Validate the Avro schema without meta fields
+ let avro_schema = hudi_table.get_schema_in_avro_str().await;
assert!(avro_schema.is_ok());
let avro_schema = avro_schema.unwrap();
- assert_avro_field_names_eq!(&avro_schema, ["id", "name",
"isActive"])
+ assert_avro_field_names_eq!(&avro_schema, ["id", "name",
"isActive"]);
+
+ // Validate the Avro schema with meta fields
+ let avro_schema =
hudi_table.get_schema_in_avro_str_with_meta_fields().await;
+ assert!(avro_schema.is_ok());
+ let avro_schema = avro_schema.unwrap();
+ assert_avro_field_names_eq!(
+ &avro_schema,
+ [
+ MetaField::field_names().as_slice(),
+ &["id", "name", "isActive"]
+ ]
+ .concat()
+ );
}
}
- #[test]
- fn hudi_table_get_schema() {
+ #[tokio::test]
+ async fn hudi_table_get_schema() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let original_field_names = [
"id",
"name",
@@ -1109,8 +972,14 @@ mod tests {
"structField",
];
- // Check Arrow schema
- let arrow_schema = hudi_table.get_schema_blocking();
+ // Check Arrow schema without meta fields
+ let arrow_schema = hudi_table.get_schema().await;
+ assert!(arrow_schema.is_ok());
+ let arrow_schema = arrow_schema.unwrap();
+ assert_arrow_field_names_eq!(arrow_schema, original_field_names);
+
+ // Check Arrow schema with meta fields
+ let arrow_schema = hudi_table.get_schema_with_meta_fields().await;
assert!(arrow_schema.is_ok());
let arrow_schema = arrow_schema.unwrap();
assert_arrow_field_names_eq!(
@@ -1118,27 +987,27 @@ mod tests {
[MetaField::field_names(), original_field_names.to_vec()].concat()
);
- // Check Avro schema
- let avro_schema = hudi_table.get_avro_schema_blocking();
+ // Check Avro schema without meta fields
+ let avro_schema = hudi_table.get_schema_in_avro_str().await;
assert!(avro_schema.is_ok());
let avro_schema = avro_schema.unwrap();
assert_avro_field_names_eq!(&avro_schema, original_field_names);
}
- #[test]
- fn hudi_table_get_partition_schema() {
+ #[tokio::test]
+ async fn hudi_table_get_partition_schema() {
let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
- let schema = hudi_table.get_partition_schema_blocking();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let schema = hudi_table.get_partition_schema().await;
assert!(schema.is_ok());
let schema = schema.unwrap();
assert_arrow_field_names_eq!(schema, ["ts_str"]);
}
- #[test]
+ #[tokio::test]
#[serial(env_vars)]
- fn validate_invalid_table_props() {
- let table = get_test_table_without_validation("table_props_invalid");
+ async fn validate_invalid_table_props() {
+ let table =
get_test_table_without_validation("table_props_invalid").await;
let configs = table.hudi_configs;
assert!(
configs.validate(BaseFileFormat).is_err(),
@@ -1188,10 +1057,10 @@ mod tests {
);
}
- #[test]
+ #[tokio::test]
#[serial(env_vars)]
- fn get_invalid_table_props() {
- let table = get_test_table_without_validation("table_props_invalid");
+ async fn get_invalid_table_props() {
+ let table =
get_test_table_without_validation("table_props_invalid").await;
let configs = table.hudi_configs;
assert!(configs.get(BaseFileFormat).is_err());
assert!(configs.get(Checksum).is_err());
@@ -1211,10 +1080,10 @@ mod tests {
assert!(configs.get(TimelineTimezone).is_err());
}
- #[test]
+ #[tokio::test]
#[serial(env_vars)]
- fn get_default_for_invalid_table_props() {
- let table = get_test_table_without_validation("table_props_invalid");
+ async fn get_default_for_invalid_table_props() {
+ let table =
get_test_table_without_validation("table_props_invalid").await;
let configs = table.hudi_configs;
let actual: String = configs.get_or_default(BaseFileFormat).into();
assert_eq!(actual, "parquet");
@@ -1243,10 +1112,10 @@ mod tests {
assert_eq!(actual, "utc");
}
- #[test]
+ #[tokio::test]
#[serial(env_vars)]
- fn get_valid_table_props() {
- let table = get_test_table_without_validation("table_props_valid");
+ async fn get_valid_table_props() {
+ let table =
get_test_table_without_validation("table_props_valid").await;
let configs = table.hudi_configs;
let actual: String = configs.get(BaseFileFormat).unwrap().into();
assert_eq!(actual, "parquet");
@@ -1282,11 +1151,11 @@ mod tests {
assert_eq!(actual, "local");
}
- #[test]
+ #[tokio::test]
#[serial(env_vars)]
- fn get_global_table_props() {
+ async fn get_global_table_props() {
// Without the environment variable HUDI_CONF_DIR
- let table = get_test_table_without_validation("table_props_partial");
+ let table =
get_test_table_without_validation("table_props_partial").await;
let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
@@ -1299,7 +1168,7 @@ mod tests {
unsafe {
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
}
- let table = get_test_table_without_validation("table_props_partial");
+ let table =
get_test_table_without_validation("table_props_partial").await;
let configs = table.hudi_configs;
assert!(configs.get(DatabaseName).is_err());
assert!(configs.get(TableType).is_err());
@@ -1312,7 +1181,7 @@ mod tests {
unsafe {
env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
}
- let table = get_test_table_without_validation("table_props_partial");
+ let table =
get_test_table_without_validation("table_props_partial").await;
let configs = table.hudi_configs;
let actual: String = configs.get(DatabaseName).unwrap().into();
assert_eq!(actual, "tmpdb");
@@ -1325,54 +1194,58 @@ mod tests {
}
}
- #[test]
- fn hudi_table_read_file_slice() {
+ #[tokio::test]
+ async fn hudi_table_read_file_slice() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let batches = hudi_table
.create_file_group_reader_with_options(empty_options())
.unwrap()
- .read_file_slice_by_base_file_path_blocking(
+ .read_file_slice_by_base_file_path(
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
)
+ .await
.unwrap();
assert_eq!(batches.num_rows(), 4);
assert_eq!(batches.num_columns(), 21);
}
- #[test]
- fn empty_hudi_table_get_file_slices_splits() {
+ #[tokio::test]
+ async fn empty_hudi_table_get_file_slices_splits() {
let base_url = SampleTable::V6Empty.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
- .get_file_slices_splits_blocking(2, empty_filters())
+ .get_file_slices_splits(2, empty_filters())
+ .await
.unwrap();
assert!(file_slices_splits.is_empty());
}
- #[test]
- fn hudi_table_get_file_slices_splits() {
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_splits() {
let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
- .get_file_slices_splits_blocking(2, empty_filters())
+ .get_file_slices_splits(2, empty_filters())
+ .await
.unwrap();
assert_eq!(file_slices_splits.len(), 2);
assert_eq!(file_slices_splits[0].len(), 2);
assert_eq!(file_slices_splits[1].len(), 1);
}
- #[test]
- fn hudi_table_get_file_slices_splits_as_of_timestamps() {
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_splits_as_of_timestamps() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
// before replacecommit (insert overwrite table)
let second_latest_timestamp = "20250121000656060";
let file_slices_splits = hudi_table
- .get_file_slices_splits_as_of_blocking(2, second_latest_timestamp,
empty_filters())
+ .get_file_slices_splits_as_of(2, second_latest_timestamp,
empty_filters())
+ .await
.unwrap();
assert_eq!(file_slices_splits.len(), 2);
assert_eq!(file_slices_splits[0].len(), 2);
@@ -1405,20 +1278,19 @@ mod tests {
// as of replacecommit (insert overwrite table)
let latest_timestamp = "20250121000702475";
let file_slices_splits = hudi_table
- .get_file_slices_splits_as_of_blocking(2, latest_timestamp,
empty_filters())
+ .get_file_slices_splits_as_of(2, latest_timestamp, empty_filters())
+ .await
.unwrap();
assert_eq!(file_slices_splits.len(), 1);
assert_eq!(file_slices_splits[0].len(), 1);
}
- #[test]
- fn hudi_table_get_file_slices_as_of_timestamps() {
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_as_of_timestamps() {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
- let file_slices = hudi_table
- .get_file_slices_blocking(empty_filters())
- .unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
+ let file_slices =
hudi_table.get_file_slices(empty_filters()).await.unwrap();
assert_eq!(
file_slices
.iter()
@@ -1428,9 +1300,10 @@ mod tests {
);
// as of the latest timestamp
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices = hudi_table
- .get_file_slices_as_of_blocking("20240418173551906",
empty_filters())
+ .get_file_slices_as_of("20240418173551906", empty_filters())
+ .await
.unwrap();
assert_eq!(
file_slices
@@ -1441,10 +1314,12 @@ mod tests {
);
// as of just smaller than the latest timestamp
- let hudi_table =
- Table::new_with_options_blocking(base_url.path(),
empty_options()).unwrap();
+ let hudi_table = Table::new_with_options(base_url.path(),
empty_options())
+ .await
+ .unwrap();
let file_slices = hudi_table
- .get_file_slices_as_of_blocking("20240418173551905",
empty_filters())
+ .get_file_slices_as_of("20240418173551905", empty_filters())
+ .await
.unwrap();
assert_eq!(
file_slices
@@ -1455,10 +1330,12 @@ mod tests {
);
// as of non-exist old timestamp
- let hudi_table =
- Table::new_with_options_blocking(base_url.path(),
empty_options()).unwrap();
+ let hudi_table = Table::new_with_options(base_url.path(),
empty_options())
+ .await
+ .unwrap();
let file_slices = hudi_table
- .get_file_slices_as_of_blocking("19700101000000", empty_filters())
+ .get_file_slices_as_of("19700101000000", empty_filters())
+ .await
.unwrap();
assert_eq!(
file_slices
@@ -1469,22 +1346,24 @@ mod tests {
);
}
- #[test]
- fn empty_hudi_table_get_file_slices_between_timestamps() {
+ #[tokio::test]
+ async fn empty_hudi_table_get_file_slices_between_timestamps() {
let base_url = SampleTable::V6Empty.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices = hudi_table
- .get_file_slices_between_blocking(Some(EARLIEST_START_TIMESTAMP),
None)
+ .get_file_slices_between(Some(EARLIEST_START_TIMESTAMP), None)
+ .await
.unwrap();
assert!(file_slices.is_empty())
}
- #[test]
- fn hudi_table_get_file_slices_between_timestamps() {
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_between_timestamps() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let mut file_slices = hudi_table
- .get_file_slices_between_blocking(None, Some("20250121000656060"))
+ .get_file_slices_between(None, Some("20250121000656060"))
+ .await
.unwrap();
assert_eq!(file_slices.len(), 3);
@@ -1515,22 +1394,24 @@ mod tests {
assert!(file_slice_2.log_files.is_empty());
}
- #[test]
- fn empty_hudi_table_get_file_slices_splits_between() {
+ #[tokio::test]
+ async fn empty_hudi_table_get_file_slices_splits_between() {
let base_url = SampleTable::V6Empty.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
- .get_file_slices_splits_between_blocking(2,
Some(EARLIEST_START_TIMESTAMP), None)
+ .get_file_slices_splits_between(2, Some(EARLIEST_START_TIMESTAMP),
None)
+ .await
.unwrap();
assert!(file_slices_splits.is_empty())
}
- #[test]
- fn hudi_table_get_file_slices_splits_between() {
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_splits_between() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
- .get_file_slices_splits_between_blocking(2, None,
Some("20250121000656060"))
+ .get_file_slices_splits_between(2, None, Some("20250121000656060"))
+ .await
.unwrap();
assert_eq!(file_slices_splits.len(), 2);
@@ -1540,12 +1421,13 @@ mod tests {
assert_eq!(file_slices_splits[1].len(), 1);
}
- #[test]
- fn hudi_table_get_file_slices_splits_between_with_single_split() {
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_splits_between_with_single_split() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
- .get_file_slices_splits_between_blocking(1, None,
Some("20250121000656060"))
+ .get_file_slices_splits_between(1, None, Some("20250121000656060"))
+ .await
.unwrap();
// Should have 1 split with all 3 file slices
@@ -1553,12 +1435,13 @@ mod tests {
assert_eq!(file_slices_splits[0].len(), 3);
}
- #[test]
- fn hudi_table_get_file_slices_splits_between_with_many_splits() {
+ #[tokio::test]
+ async fn hudi_table_get_file_slices_splits_between_with_many_splits() {
let base_url =
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
let file_slices_splits = hudi_table
- .get_file_slices_splits_between_blocking(10, None,
Some("20250121000656060"))
+ .get_file_slices_splits_between(10, None,
Some("20250121000656060"))
+ .await
.unwrap();
assert_eq!(file_slices_splits.len(), 3);
@@ -1567,14 +1450,15 @@ mod tests {
}
}
- #[test]
- fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
+ #[tokio::test]
+ async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
let partition_filters = &[];
let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
+ .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1590,6 +1474,7 @@ mod tests {
let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
+ .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1603,6 +1488,7 @@ mod tests {
assert_eq!(actual, expected);
let actual = get_file_paths_with_filters(&hudi_table, &[("byteField",
">", "30")])
+ .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1610,14 +1496,15 @@ mod tests {
assert_eq!(actual, expected);
}
- #[test]
- fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
+ #[tokio::test]
+ async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+ let hudi_table = Table::new(base_url.path()).await.unwrap();
assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
let partition_filters = &[];
let actual = get_file_paths_with_filters(&hudi_table,
partition_filters)
+ .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1637,6 +1524,7 @@ mod tests {
("shortField", "!=", "100"),
];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
+ .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
@@ -1650,6 +1538,7 @@ mod tests {
let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")];
let actual = get_file_paths_with_filters(&hudi_table, &filters)
+ .await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 9b7b846..76ced9b 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -31,7 +31,7 @@ use crate::error::CoreError;
use crate::file_group::FileGroup;
use crate::file_group::builder::replaced_file_groups_from_replace_commit;
use crate::schema::resolver::{
- resolve_avro_schema_from_commit_metadata,
resolve_schema_from_commit_metadata,
+ resolve_avro_schema_from_commit_metadata,
resolve_data_schema_from_commit_metadata,
};
use crate::storage::Storage;
use crate::timeline::builder::TimelineBuilder;
@@ -270,14 +270,14 @@ impl Timeline {
resolve_avro_schema_from_commit_metadata(&commit_metadata)
}
- /// Get the latest [arrow_schema::Schema] from the [Timeline].
+ /// Get the latest data [arrow_schema::Schema] from the [Timeline],
without Hudi meta fields.
///
/// ### Note
/// This API behaves differently from [crate::table::Table::get_schema],
/// which additionally looks for [HudiTableConfig::CreateSchema] in the
table config.
pub async fn get_latest_schema(&self) -> Result<Schema> {
let commit_metadata = self.get_latest_commit_metadata().await?;
- resolve_schema_from_commit_metadata(&commit_metadata,
self.storage.clone()).await
+ resolve_data_schema_from_commit_metadata(&commit_metadata,
self.storage.clone()).await
}
pub(crate) async fn get_replaced_file_groups_as_of(
@@ -485,7 +485,8 @@ mod tests {
let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
let timeline = create_test_timeline(base_url).await;
let table_schema = timeline.get_latest_schema().await.unwrap();
- assert_eq!(table_schema.fields.len(), 21)
+ // get_latest_schema returns data schema without meta fields
+ assert_eq!(table_schema.fields.len(), 16)
}
#[tokio::test]
@@ -594,17 +595,13 @@ mod tests {
.unwrap();
let timeline = create_test_timeline(base_url).await;
- // Check Arrow schema
+ // Check Arrow schema — get_latest_schema returns data schema without
meta fields
let arrow_schema = timeline.get_latest_schema().await;
assert!(arrow_schema.is_ok());
let arrow_schema = arrow_schema.unwrap();
assert_arrow_field_names_eq!(
arrow_schema,
- [
- MetaField::field_names(),
- vec!["ts", "uuid", "rider", "driver", "fare", "city"]
- ]
- .concat()
+ vec!["ts", "uuid", "rider", "driver", "fare", "city"]
);
// Check Avro schema
diff --git a/crates/core/tests/table_read_tests.rs
b/crates/core/tests/table_read_tests.rs
index 89596ee..e3d424f 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -35,21 +35,21 @@ mod v6_tables {
mod snapshot_queries {
use super::*;
- #[test]
- fn test_empty_table() -> Result<()> {
+ #[tokio::test]
+ async fn test_empty_table() -> Result<()> {
for base_url in SampleTable::V6Empty.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records =
hudi_table.read_snapshot_blocking(empty_filters())?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
assert!(records.is_empty());
}
Ok(())
}
- #[test]
- fn test_non_partitioned() -> Result<()> {
+ #[tokio::test]
+ async fn test_non_partitioned() -> Result<()> {
for base_url in SampleTable::V6Nonpartitioned.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records =
hudi_table.read_snapshot_blocking(empty_filters())?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -67,13 +67,14 @@ mod v6_tables {
Ok(())
}
- #[test]
- fn test_non_partitioned_read_optimized() -> Result<()> {
+ #[tokio::test]
+ async fn test_non_partitioned_read_optimized() -> Result<()> {
let base_url = SampleTable::V6Nonpartitioned.url_to_mor_parquet();
- let hudi_table = Table::new_with_options_blocking(
+ let hudi_table = Table::new_with_options(
base_url.path(),
[(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
- )?;
+ )
+ .await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
@@ -81,8 +82,9 @@ mod v6_tables {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let latest_commit = commit_timestamps.last().unwrap();
- let records =
- hudi_table.read_snapshot_as_of_blocking(latest_commit,
empty_filters())?;
+ let records = hudi_table
+ .read_snapshot_as_of(latest_commit, empty_filters())
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -99,11 +101,11 @@ mod v6_tables {
Ok(())
}
- #[test]
- fn test_non_partitioned_rollback() -> Result<()> {
+ #[tokio::test]
+ async fn test_non_partitioned_rollback() -> Result<()> {
let base_url =
SampleTable::V6NonpartitionedRollback.url_to_mor_parquet();
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -119,17 +121,17 @@ mod v6_tables {
Ok(())
}
- #[test]
- fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
+ #[tokio::test]
+ async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
let filters = vec![
("byteField", ">=", "10"),
("byteField", "<", "20"),
("shortField", "!=", "100"),
];
- let records = hudi_table.read_snapshot_blocking(filters)?;
+ let records = hudi_table.read_snapshot(filters).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -139,11 +141,11 @@ mod v6_tables {
Ok(())
}
- #[test]
- fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
+ #[tokio::test]
+ async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
for base_url in
SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records =
hudi_table.read_snapshot_blocking(empty_filters())?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -165,10 +167,10 @@ mod v6_tables {
mod time_travel_queries {
use super::*;
- #[test]
- fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
+ #[tokio::test]
+ async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
@@ -176,8 +178,9 @@ mod v6_tables {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records =
- hudi_table.read_snapshot_as_of_blocking(first_commit,
empty_filters())?;
+ let records = hudi_table
+ .read_snapshot_as_of(first_commit, empty_filters())
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -194,15 +197,15 @@ mod v6_tables {
mod mor_log_file_queries {
use super::*;
- #[test]
- fn test_quickstart_trips_inserts_updates() -> Result<()> {
+ #[tokio::test]
+ async fn test_quickstart_trips_inserts_updates() -> Result<()> {
let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
let updated_rider = "rider-D";
// verify updated record as of the latest commit
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -224,7 +227,9 @@ mod v6_tables {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records =
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
+ let records = hudi_table
+ .read_snapshot_as_of(first_commit, empty_filters())
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -241,15 +246,15 @@ mod v6_tables {
Ok(())
}
- #[test]
- fn test_quickstart_trips_inserts_deletes() -> Result<()> {
+ #[tokio::test]
+ async fn test_quickstart_trips_inserts_deletes() -> Result<()> {
let base_url = QuickstartTripsTable::V6Trips8I3D.url_to_mor_avro();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
let deleted_riders = ["rider-A", "rider-C", "rider-D"];
// verify deleted record as of the latest commit
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let riders = QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -270,7 +275,9 @@ mod v6_tables {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records =
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
+ let records = hudi_table
+ .read_snapshot_as_of(first_commit, empty_filters())
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let mut uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -293,20 +300,20 @@ mod v6_tables {
mod incremental_queries {
use super::*;
- #[test]
- fn test_empty_table() -> Result<()> {
+ #[tokio::test]
+ async fn test_empty_table() -> Result<()> {
for base_url in SampleTable::V6Empty.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records =
hudi_table.read_incremental_records_blocking("0", None)?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_incremental_records("0",
None).await?;
assert!(records.is_empty())
}
Ok(())
}
- #[test]
- fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
+ #[tokio::test]
+ async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()>
{
for base_url in
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
let commit_timestamps = hudi_table
.timeline
.completed_commits
@@ -320,7 +327,8 @@ mod v6_tables {
// read records changed from the beginning to the 1st commit
let records = hudi_table
- .read_incremental_records_blocking("19700101000000",
Some(first_commit))?;
+ .read_incremental_records("19700101000000",
Some(first_commit))
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -332,7 +340,8 @@ mod v6_tables {
// read records changed from the 1st to the 2nd commit
let records = hudi_table
- .read_incremental_records_blocking(first_commit,
Some(second_commit))?;
+ .read_incremental_records(first_commit,
Some(second_commit))
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -344,7 +353,8 @@ mod v6_tables {
// read records changed from the 2nd to the 3rd commit
let records = hudi_table
- .read_incremental_records_blocking(second_commit,
Some(third_commit))?;
+ .read_incremental_records(second_commit,
Some(third_commit))
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -355,7 +365,9 @@ mod v6_tables {
);
// read records changed from the 1st commit
- let records =
hudi_table.read_incremental_records_blocking(first_commit, None)?;
+ let records = hudi_table
+ .read_incremental_records(first_commit, None)
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let sample_data =
SampleTable::sample_data_order_by_id(&records);
@@ -366,7 +378,9 @@ mod v6_tables {
);
// read records changed from the 3rd commit
- let records =
hudi_table.read_incremental_records_blocking(third_commit, None)?;
+ let records = hudi_table
+ .read_incremental_records(third_commit, None)
+ .await?;
assert!(
records.is_empty(),
"Should return 0 record as it's the latest commit"
@@ -384,20 +398,20 @@ mod v8_tables {
mod snapshot_queries {
use super::*;
- #[test]
- fn test_empty_table() -> Result<()> {
+ #[tokio::test]
+ async fn test_empty_table() -> Result<()> {
let base_url = SampleTable::V8Empty.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
assert!(records.is_empty());
Ok(())
}
- #[test]
- fn test_non_partitioned() -> Result<()> {
+ #[tokio::test]
+ async fn test_non_partitioned() -> Result<()> {
let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path())?;
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let hudi_table = Table::new(base_url.path()).await?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -414,12 +428,12 @@ mod v8_tables {
Ok(())
}
- #[test]
- fn test_complex_keygen_hive_style() -> Result<()> {
+ #[tokio::test]
+ async fn test_complex_keygen_hive_style() -> Result<()> {
let base_url = SampleTable::V8ComplexkeygenHivestyle.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -436,12 +450,12 @@ mod v8_tables {
Ok(())
}
- #[test]
- fn test_simple_keygen_nonhivestyle() -> Result<()> {
+ #[tokio::test]
+ async fn test_simple_keygen_nonhivestyle() -> Result<()> {
let base_url =
SampleTable::V8SimplekeygenNonhivestyle.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -458,12 +472,12 @@ mod v8_tables {
Ok(())
}
- #[test]
- fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
+ #[tokio::test]
+ async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
let base_url =
SampleTable::V8SimplekeygenHivestyleNoMetafields.url_to_cow();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
@@ -485,16 +499,16 @@ mod v8_tables {
mod mor_log_file_queries {
use super::*;
- #[test]
- fn test_quickstart_trips_inserts_updates_deletes() -> Result<()> {
+ #[tokio::test]
+ async fn test_quickstart_trips_inserts_updates_deletes() -> Result<()>
{
// V8Trips8I3U1D: 8 inserts, 3 updates (A, J, G fare=0), 2 deletes
(F, J)
let base_url =
QuickstartTripsTable::V8Trips8I3U1D.url_to_mor_avro();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
let deleted_riders = ["rider-F", "rider-J"];
// verify deleted records are not present in latest snapshot
- let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+ let records = hudi_table.read_snapshot(empty_filters()).await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records);
@@ -534,7 +548,9 @@ mod v8_tables {
.map(|i| i.timestamp.as_str())
.collect::<Vec<_>>();
let first_commit = commit_timestamps[0];
- let records =
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
+ let records = hudi_table
+ .read_snapshot_as_of(first_commit, empty_filters())
+ .await?;
let schema = &records[0].schema();
let records = concat_batches(schema, &records)?;
let mut uuid_rider_and_fare =
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -1154,10 +1170,10 @@ mod mdt_enabled_tables {
/// 1. Table can be read correctly via MDT file listing
/// 2. MDT partition key normalization ("." -> "") works correctly
/// 3. File slices are retrieved correctly from MDT
- #[test]
- fn test_v9_nonpartitioned_with_mdt() -> Result<()> {
+ #[tokio::test]
+ async fn test_v9_nonpartitioned_with_mdt() -> Result<()> {
let base_url = SampleTable::V9TxnsNonpartMeta.url_to_mor_avro();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
// Verify MDT is enabled
assert!(
@@ -1166,7 +1182,7 @@ mod mdt_enabled_tables {
);
// Get file slices - this uses MDT file listing
- let file_slices =
hudi_table.get_file_slices_blocking(empty_filters())?;
+ let file_slices =
hudi_table.get_file_slices(empty_filters()).await?;
// Should have file slices for the non-partitioned table
assert!(
@@ -1189,15 +1205,16 @@ mod mdt_enabled_tables {
/// The metadata table stores "." as partition key, but external API
should see "".
/// For non-partitioned tables, we use a fast path that directly
fetches "." without
/// going through __all_partitions__ lookup.
- #[test]
- fn test_v9_nonpartitioned_mdt_partition_normalization() -> Result<()> {
+ #[tokio::test]
+ async fn test_v9_nonpartitioned_mdt_partition_normalization() ->
Result<()> {
let base_url = SampleTable::V9TxnsNonpartMeta.url_to_mor_avro();
- let hudi_table = Table::new_blocking(base_url.path())?;
+ let hudi_table = Table::new(base_url.path()).await?;
// Read MDT files partition records
let partition_pruner = PartitionPruner::empty();
- let records =
-
hudi_table.read_metadata_table_files_partition_blocking(&partition_pruner)?;
+ let records = hudi_table
+ .read_metadata_table_files_partition(&partition_pruner)
+ .await?;
// For non-partitioned tables, the fast path only fetches the
files record.
// __all_partitions__ is not fetched to avoid redundant HFile
lookup.
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 0674cd3..b7e3200 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -23,7 +23,6 @@ use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
-use std::thread;
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
@@ -84,6 +83,8 @@ use hudi_core::table::Table as HudiTable;
#[derive(Clone)]
pub struct HudiDataSource {
table: Arc<HudiTable>,
+ /// Cached table schema (with meta fields) for synchronous access in
`TableProvider::schema()`.
+ schema: SchemaRef,
/// Cached partition schema for determining partition columns.
/// This is cached at construction since partition schema rarely changes
/// and is needed synchronously in `supports_filters_pushdown`.
@@ -122,6 +123,16 @@ impl HudiDataSource {
.await
.map_err(|e| Execution(format!("Failed to create Hudi table:
{e}")))?;
+ // Cache schema with meta fields at construction for synchronous access
+ let schema = table
+ .get_schema_with_meta_fields()
+ .await
+ .map(SchemaRef::from)
+ .unwrap_or_else(|e| {
+ warn!("Failed to get table schema, using empty schema: {e}");
+ SchemaRef::from(Schema::empty())
+ });
+
// Cache partition schema at construction for use in
supports_filters_pushdown
let partition_schema = match table.get_partition_schema().await {
Ok(s) => s,
@@ -133,6 +144,7 @@ impl HudiDataSource {
Ok(Self {
table: Arc::new(table),
+ schema,
partition_schema,
})
}
@@ -266,13 +278,7 @@ impl TableProvider for HudiDataSource {
}
fn schema(&self) -> SchemaRef {
- let table = self.table.clone();
- let handle = thread::spawn(move || {
- let rt = tokio::runtime::Runtime::new().unwrap();
- rt.block_on(async { table.get_schema().await })
- });
- let result = handle.join().unwrap().unwrap_or_else(|_|
Schema::empty());
- SchemaRef::from(result)
+ self.schema.clone()
}
fn table_type(&self) -> TableType {
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 89c72b9..502d425 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -208,9 +208,17 @@ class HudiTable:
str: The timezone of the table.
"""
...
- def get_avro_schema(self) -> str:
+ def get_schema_in_avro_str(self) -> str:
"""
- Returns the Avro schema of the Hudi table.
+ Returns the Avro schema of the Hudi table, without meta fields.
+
+ Returns:
+ str: The Avro schema of the table.
+ """
+ ...
+ def get_schema_in_avro_str_with_meta_fields(self) -> str:
+ """
+ Returns the Avro schema of the Hudi table, with meta fields prepended.
Returns:
str: The Avro schema of the table.
@@ -218,7 +226,15 @@ class HudiTable:
...
def get_schema(self) -> "pyarrow.Schema":
"""
- Returns the schema of the Hudi table.
+ Returns the schema of the Hudi table, without meta fields.
+
+ Returns:
+ pyarrow.Schema: The schema of the table.
+ """
+ ...
+ def get_schema_with_meta_fields(self) -> "pyarrow.Schema":
+ """
+ Returns the schema of the Hudi table, with meta fields prepended.
Returns:
pyarrow.Schema: The schema of the table.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index e6224d3..ca5081c 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -83,11 +83,17 @@ impl HudiFileGroupReader {
#[new]
#[pyo3(signature = (base_uri, options=None))]
fn new_with_options(
+ py: Python,
base_uri: &str,
options: Option<HashMap<String, String>>,
) -> PyResult<Self> {
- let inner = FileGroupReader::new_with_options(base_uri,
options.unwrap_or_default())
- .map_err(PythonError::from)?;
+ let inner = py.detach(|| {
+ rt().block_on(FileGroupReader::new_with_options(
+ base_uri,
+ options.unwrap_or_default(),
+ ))
+ .map_err(PythonError::from)
+ })?;
Ok(HudiFileGroupReader { inner })
}
@@ -96,11 +102,14 @@ impl HudiFileGroupReader {
relative_path: &str,
py: Python,
) -> PyResult<Py<PyAny>> {
-
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
- .map_err(PythonError::from)?
- .to_pyarrow(py)
- .map(|b| b.unbind())
+ py.detach(|| {
+
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
+ .map_err(PythonError::from)
+ })?
+ .to_pyarrow(py)
+ .map(|b| b.unbind())
}
+
fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) ->
PyResult<Py<PyAny>> {
let mut file_group = FileGroup::new_with_base_file_name(
&file_slice.base_file_name,
@@ -121,10 +130,12 @@ impl HudiFileGroupReader {
))
})
.map_err(PythonError::from)?;
- rt().block_on(self.inner.read_file_slice(file_slice))
- .map_err(PythonError::from)?
- .to_pyarrow(py)
- .map(|b| b.unbind())
+ py.detach(|| {
+ rt().block_on(self.inner.read_file_slice(file_slice))
+ .map_err(PythonError::from)
+ })?
+ .to_pyarrow(py)
+ .map(|b| b.unbind())
}
fn read_file_slice_from_paths(
@@ -133,11 +144,13 @@ impl HudiFileGroupReader {
log_file_paths: Vec<String>,
py: Python,
) -> PyResult<Py<PyAny>> {
- rt().block_on(
- self.inner
- .read_file_slice_from_paths(base_file_path, log_file_paths),
- )
- .map_err(PythonError::from)?
+ py.detach(|| {
+ rt().block_on(
+ self.inner
+ .read_file_slice_from_paths(base_file_path,
log_file_paths),
+ )
+ .map_err(PythonError::from)
+ })?
.to_pyarrow(py)
.map(|b| b.unbind())
}
@@ -278,15 +291,17 @@ impl HudiTable {
#[new]
#[pyo3(signature = (base_uri, options=None))]
fn new_with_options(
+ py: Python,
base_uri: &str,
options: Option<HashMap<String, String>>,
) -> PyResult<Self> {
- let inner: Table = rt()
- .block_on(Table::new_with_options(
+ let inner: Table = py.detach(|| {
+ rt().block_on(Table::new_with_options(
base_uri,
options.unwrap_or_default(),
))
- .map_err(PythonError::from)?;
+ .map_err(PythonError::from)
+ })?;
Ok(HudiTable { inner })
}
@@ -318,27 +333,49 @@ impl HudiTable {
self.inner.timezone()
}
- fn get_avro_schema(&self, py: Python) -> PyResult<String> {
+ fn get_schema_in_avro_str(&self, py: Python) -> PyResult<String> {
+ py.detach(|| {
+ let avro_schema = rt()
+ .block_on(self.inner.get_schema_in_avro_str())
+ .map_err(PythonError::from)?;
+ Ok(avro_schema)
+ })
+ }
+
+ fn get_schema_in_avro_str_with_meta_fields(&self, py: Python) ->
PyResult<String> {
py.detach(|| {
let avro_schema = rt()
- .block_on(self.inner.get_avro_schema())
+ .block_on(self.inner.get_schema_in_avro_str_with_meta_fields())
.map_err(PythonError::from)?;
Ok(avro_schema)
})
}
fn get_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
- rt().block_on(self.inner.get_schema())
- .map_err(PythonError::from)?
- .to_pyarrow(py)
- .map(|b| b.unbind())
+ py.detach(|| {
+ rt().block_on(self.inner.get_schema())
+ .map_err(PythonError::from)
+ })?
+ .to_pyarrow(py)
+ .map(|b| b.unbind())
+ }
+
+ fn get_schema_with_meta_fields(&self, py: Python) -> PyResult<Py<PyAny>> {
+ py.detach(|| {
+ rt().block_on(self.inner.get_schema_with_meta_fields())
+ .map_err(PythonError::from)
+ })?
+ .to_pyarrow(py)
+ .map(|b| b.unbind())
}
fn get_partition_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
- rt().block_on(self.inner.get_partition_schema())
- .map_err(PythonError::from)?
- .to_pyarrow(py)
- .map(|b| b.unbind())
+ py.detach(|| {
+ rt().block_on(self.inner.get_partition_schema())
+ .map_err(PythonError::from)
+ })?
+ .to_pyarrow(py)
+ .map(|b| b.unbind())
}
fn get_timeline(&self, py: Python) -> HudiTimeline {
@@ -460,10 +497,12 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Py<PyAny>> {
- rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
- .map_err(PythonError::from)?
- .to_pyarrow(py)
- .map(|b| b.unbind())
+ py.detach(|| {
+
rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
+ .map_err(PythonError::from)
+ })?
+ .to_pyarrow(py)
+ .map(|b| b.unbind())
}
#[pyo3(signature = (timestamp, filters=None))]
@@ -473,11 +512,13 @@ impl HudiTable {
filters: Option<Vec<(String, String, String)>>,
py: Python,
) -> PyResult<Py<PyAny>> {
- rt().block_on(
- self.inner
- .read_snapshot_as_of(timestamp, filters.unwrap_or_default()),
- )
- .map_err(PythonError::from)?
+ py.detach(|| {
+ rt().block_on(
+ self.inner
+ .read_snapshot_as_of(timestamp,
filters.unwrap_or_default()),
+ )
+ .map_err(PythonError::from)
+ })?
.to_pyarrow(py)
.map(|b| b.unbind())
}
@@ -489,11 +530,13 @@ impl HudiTable {
end_timestamp: Option<&str>,
py: Python,
) -> PyResult<Py<PyAny>> {
- rt().block_on(
- self.inner
- .read_incremental_records(start_timestamp, end_timestamp),
- )
- .map_err(PythonError::from)?
+ py.detach(|| {
+ rt().block_on(
+ self.inner
+ .read_incremental_records(start_timestamp, end_timestamp),
+ )
+ .map_err(PythonError::from)
+ })?
.to_pyarrow(py)
.map(|b| b.unbind())
}
@@ -589,10 +632,12 @@ impl HudiTimeline {
}
pub fn get_latest_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
- rt().block_on(self.inner.get_latest_schema())
- .map_err(PythonError::from)?
- .to_pyarrow(py)
- .map(|b| b.unbind())
+ py.detach(|| {
+ rt().block_on(self.inner.get_latest_schema())
+ .map_err(PythonError::from)
+ })?
+ .to_pyarrow(py)
+ .map(|b| b.unbind())
}
}
@@ -608,20 +653,22 @@ impl From<&Timeline> for HudiTimeline {
#[pyfunction]
#[pyo3(signature = (base_uri, hudi_options=None, storage_options=None,
options=None))]
pub fn build_hudi_table(
+ py: Python,
base_uri: String,
hudi_options: Option<HashMap<String, String>>,
storage_options: Option<HashMap<String, String>>,
options: Option<HashMap<String, String>>,
) -> PyResult<HudiTable> {
- let inner = rt()
- .block_on(
+ let inner = py.detach(|| {
+ rt().block_on(
TableBuilder::from_base_uri(&base_uri)
.with_hudi_options(hudi_options.unwrap_or_default())
.with_storage_options(storage_options.unwrap_or_default())
.with_options(options.unwrap_or_default())
.build(),
)
- .map_err(PythonError::from)?;
+ .map_err(PythonError::from)
+ })?;
Ok(HudiTable { inner })
}
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 3a52fbb..6702b6f 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -32,7 +32,7 @@ from hudi import HudiTable
def test_read_table_has_correct_schema(v8_trips_table):
table = HudiTable(v8_trips_table)
- assert table.get_schema().names == [
+ assert table.get_schema_with_meta_fields().names == [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",