This is an automated email from the ASF dual-hosted git repository.

zhangyue19921010 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 99fb1dc  feat: modernize Table API with async-only design (#537)
99fb1dc is described below

commit 99fb1dc45c7590d1ea50e1480c989ab91fe89757
Author: Shiyan Xu <[email protected]>
AuthorDate: Sat Mar 21 19:23:21 2026 -0500

    feat: modernize Table API with async-only design (#537)
---
 .github/copilot-instructions.md                  |  84 +---
 .github/instructions/code-review.instructions.md | 126 +-----
 .github/instructions/python.instructions.md      |  19 -
 .github/instructions/rust.instructions.md        | 178 +-------
 .licenserc.yaml                                  |   2 +
 README.md                                        |   6 +-
 cpp/Cargo.toml                                   |   1 +
 cpp/src/lib.rs                                   |  81 ++--
 crates/core/src/config/internal.rs               |   5 +-
 crates/core/src/config/read.rs                   |   5 +-
 crates/core/src/file_group/reader.rs             | 152 +++----
 crates/core/src/lib.rs                           |   5 +-
 crates/core/src/metadata/table/mod.rs            |  76 ++--
 crates/core/src/schema/mod.rs                    |  75 ++++
 crates/core/src/schema/resolver.rs               |  91 ++--
 crates/core/src/table/mod.rs                     | 539 +++++++++--------------
 crates/core/src/timeline/mod.rs                  |  17 +-
 crates/core/tests/table_read_tests.rs            | 191 ++++----
 crates/datafusion/src/lib.rs                     |  22 +-
 python/hudi/_internal.pyi                        |  22 +-
 python/src/internal.rs                           | 145 +++---
 python/tests/test_table_read.py                  |   2 +-
 22 files changed, 773 insertions(+), 1071 deletions(-)

diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md
index 3bb04ab..b8f43ad 100644
--- a/.github/copilot-instructions.md
+++ b/.github/copilot-instructions.md
@@ -1,22 +1,3 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
--->
-
 # GitHub Copilot Instructions for Apache Hudi-rs
 
 ## Project Overview
@@ -74,71 +55,10 @@ make format check
 make test
 ```
 
-## Code Review Guidelines
-
-### Multi-Round Review Behavior
-
-When reviewing subsequent commits on a PR:
-
-1. **Auto-resolve addressed comments**: If a new commit addresses a previous 
review comment, acknowledge it is resolved rather than re-raising the same 
issue.
-2. **Focus on new changes**: Prioritize reviewing newly added or modified code 
in the latest commits.
-3. **Track incremental progress**: Recognize when feedback has been 
incorporated and provide confirmation.
-4. **Avoid duplicate comments**: Do not repeat comments that have been 
addressed in subsequent commits.
-
-### Review Priority (High to Low)
-
-1. **Correctness**: Logic errors, data races, memory safety
-2. **API Design**: Public API consistency, breaking changes
-3. **Error Handling**: Proper use of `Result`, meaningful error messages
-4. **Performance**: Unnecessary allocations, inefficient patterns
-5. **Testing**: Test coverage, edge cases
-6. **Documentation**: Public API docs, complex logic explanation
-7. **Style**: Idiomatic Rust, consistency with codebase
-
-## What to Flag in Reviews
-
-### Critical Issues (Must Fix)
-
-- Panics in library code (use `Result` instead)
-- Unhandled errors (`.unwrap()` / `.expect()` in non-test code)
-- Data races or unsafe code without justification
-- Breaking public API changes without deprecation
-- Missing tests for new functionality
-
-### Important Issues (Should Fix)
-
-- Inefficient patterns (unnecessary `.clone()`, allocations in hot paths)
-- Missing documentation on public APIs
-- Large functions that should be split
-- Missing `#[must_use]` on functions returning important values
-- Blocking calls in async context
-
-### Suggestions (Nice to Have)
-
-- More idiomatic Rust patterns
-- Additional test cases for edge cases
-- Performance optimizations
-- Code organization improvements
-
-## Rust Conventions
-
-### Error Handling
-
-- Use `Result<T, E>` for fallible operations, never panic in library code
-- Propagate errors with `?` operator
-- Add context to errors when crossing module boundaries
-- Use `thiserror` for defining error types in library crates
-
-### Async Patterns
-
-- Use `tokio` as the async runtime
-- Prefer `async fn` over returning `impl Future`
-- Use `#[tokio::test]` for async tests
-- Avoid blocking calls (`std::fs`, `std::thread::sleep`) in async code
-
-### Testing
+## Testing
 
 - Unit tests in the same file as the code (`#[cfg(test)]` module)
 - Integration tests in dedicated test modules or test crate
 - Test both success and error paths
 - Use descriptive test names: `test_<function>_<scenario>_<expected_behavior>`
+- Use `#[tokio::test]` for async tests
diff --git a/.github/instructions/code-review.instructions.md 
b/.github/instructions/code-review.instructions.md
index 54e7780..9927205 100644
--- a/.github/instructions/code-review.instructions.md
+++ b/.github/instructions/code-review.instructions.md
@@ -3,56 +3,18 @@ applyTo: "**/*"
 excludeAgent: "coding-agent"
 ---
 
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
--->
-
 # Code Review Instructions for Apache Hudi-rs
 
 ## Multi-Round Review Behavior
 
-### Handling Subsequent Commits
-
-When reviewing a PR that has been updated with new commits after a previous 
review:
-
-1. **Check if previous comments are addressed**:
-   - Compare the current code state against previous review comments
-   - If a comment has been addressed by new changes, acknowledge resolution
-   - Do NOT re-raise issues that have been fixed
-
-2. **Focus on incremental changes**:
-   - Prioritize reviewing code added or modified in the latest commits
-   - Identify any new issues introduced by the fixes
-   - Check if fixes introduced regressions elsewhere
-
-3. **Comment resolution signals**:
-   - If code now follows the suggested pattern → Comment is resolved
-   - If test coverage has been added as requested → Comment is resolved
-   - If documentation has been added → Comment is resolved
-   - If error handling has been improved → Comment is resolved
-
-4. **Acknowledge progress**:
-   - When significant improvements have been made, acknowledge them
-   - Be encouraging about progress while maintaining standards
+When reviewing updated PRs:
 
-### Review Comment Format
+- Do NOT re-raise issues addressed by new commits
+- Focus on code added or modified in latest commits
+- Check if fixes introduced regressions elsewhere
+- Acknowledge progress while maintaining standards
 
-When leaving comments, use this severity classification:
+### Severity Classification
 
 - **🔴 Critical**: Must fix before merge (correctness, safety, breaking changes)
 - **🟠 Important**: Should fix before merge (error handling, testing, docs)
@@ -116,63 +78,19 @@ When leaving comments, use this severity classification:
 
 ## Patterns to Flag
 
-### Always Flag (Critical)
-
-```rust
-// Unwrap in library code
-let value = result.unwrap();  // 🔴 Use ? or proper error handling
-
-// Panic in library code
-panic!("unexpected state");   // 🔴 Return Result with error
-
-// Blocking in async
-std::thread::sleep(duration); // 🔴 Use tokio::time::sleep
-
-// Hardcoded credentials
-let key = "AKIAXXXXXXXX";     // 🔴 Security issue
-```
-
-### Usually Flag (Important)
-
-```rust
-// Missing error context
-.map_err(HudiError::from)?   // 🟠 Add context about what failed
-
-// Clone when borrow would work
-fn process(data: Vec<u8>) { } // 🟠 Consider &[u8] if not consuming
-
-// Large type on stack
-let buffer: [u8; 1_000_000];  // 🟠 Use Vec or Box for large allocations
-
-// Missing docs on pub items
-pub fn important_function() { } // 🟠 Add doc comment
-```
-
-### Sometimes Flag (Suggestion)
-
-```rust
-// Could use iterator methods
-let mut result = Vec::new();
-for item in items {
-    if predicate(&item) {
-        result.push(transform(item));
-    }
-}
-// 🟡 Consider: items.into_iter().filter(predicate).map(transform).collect()
-
-// Nested Result handling
-match result {
-    Ok(inner) => match inner { ... }  // 🟡 Consider using and_then or ?
-    Err(e) => ...
-}
-```
-
-## Cross-File Impact Assessment
-
-When changes touch these areas, expand review scope:
-
-- **Core table implementation**: Check impacts on Python bindings
-- **Public API surface (`hudi` crate)**: Check for breaking API changes
-- **DataFusion integration**: Verify DataFusion compatibility
-- **Schema types or conversions**: Check all serialization/deserialization 
paths
-- **Configuration structs**: Verify backward compatibility
+- 🔴 `.unwrap()` / `.expect()` / `panic!()` in non-test code
+- 🔴 Blocking calls (`std::thread::sleep`, `std::fs`) in async context
+- 🔴 Hardcoded credentials or secrets
+- 🟠 Missing error context (bare `.map_err()` without message)
+- 🟠 Unnecessary `.clone()`, taking ownership when borrow suffices
+- 🟠 Missing doc comments on public items
+- 🟡 Imperative loops replaceable with iterator chains
+- 🟡 Nested `match` on `Result` replaceable with `and_then` or `?`
+
+## Cross-File Impact
+
+- **Core table changes** → check Python/C++ bindings
+- **Public API (`hudi` crate)** → check for breaking changes
+- **DataFusion integration** → verify compatibility
+- **Schema/type conversions** → check serialization paths
+- **Configuration structs** → verify backward compatibility
diff --git a/.github/instructions/python.instructions.md 
b/.github/instructions/python.instructions.md
index d787c1c..3657784 100644
--- a/.github/instructions/python.instructions.md
+++ b/.github/instructions/python.instructions.md
@@ -2,25 +2,6 @@
 applyTo: "python/**"
 ---
 
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
--->
-
 # Python Bindings Instructions
 
 ## PyO3 Patterns
diff --git a/.github/instructions/rust.instructions.md 
b/.github/instructions/rust.instructions.md
index fb0ddff..6d357e7 100644
--- a/.github/instructions/rust.instructions.md
+++ b/.github/instructions/rust.instructions.md
@@ -2,129 +2,39 @@
 applyTo: "**/*.rs"
 ---
 
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~   http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing,
-  ~ software distributed under the License is distributed on an
-  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  ~ KIND, either express or implied.  See the License for the
-  ~ specific language governing permissions and limitations
-  ~ under the License.
--->
-
 # Rust Instructions for Apache Hudi-rs
 
-## Error Handling Patterns (Critical)
-
-### Must Use Result Over Panic
-
-- Flag any `.unwrap()` or `.expect()` in non-test code as **Critical**
-- Flag any `panic!()` macro in library code as **Critical**
-- Exception: `unreachable!()` for truly impossible states is acceptable with a 
comment explaining why
-
-```rust
-// GOOD
-fn parse_config(input: &str) -> Result<Config, HudiError> {
-    serde_json::from_str(input).map_err(HudiError::from)
-}
-
-// BAD - Will panic on invalid input
-fn parse_config(input: &str) -> Config {
-    serde_json::from_str(input).unwrap()
-}
-```
-
-### Error Context
+## Error Handling (Critical)
 
+- Flag `.unwrap()` / `.expect()` in non-test code as **Critical**
+- Flag `panic!()` in library code as **Critical**
+- Exception: `unreachable!()` is acceptable with a comment explaining why
 - Prefer `anyhow::Context` or custom error types with context over bare 
`.map_err()`
 - Error messages should be actionable and include relevant values
 
 ```rust
-// GOOD
+// GOOD - with context
 let file = File::open(&path)
     .with_context(|| format!("Failed to open table metadata at {}", 
path.display()))?;
 
-// LESS GOOD
+// BAD - no context
 let file = File::open(&path).map_err(|e| HudiError::Io(e))?;
 ```
 
 ## Memory and Performance
 
-### Avoid Unnecessary Allocations
-
-- Flag unnecessary `.clone()` calls, especially on large types like 
`Vec<RecordBatch>`
-- Prefer `&str` over `String` in function parameters when ownership isn't 
needed
+- Flag unnecessary `.clone()`, especially on large types like 
`Vec<RecordBatch>`
+- Prefer `&str` / `&[T]` over owned types in parameters when ownership isn't 
needed
 - Use `Cow<'_, str>` when a function might or might not need to allocate
-
-```rust
-// GOOD - Takes reference
-fn filter_partitions(partitions: &[String], predicate: &str) -> Vec<&String> { 
... }
-
-// LESS EFFICIENT - Clones unnecessarily
-fn filter_partitions(partitions: Vec<String>, predicate: String) -> 
Vec<String> { ... }
-```
-
-### Arrow Memory Patterns
-
-- Prefer Arrow compute kernels over manual iteration
+- Prefer Arrow compute kernels over manual iteration on arrays
 - Use `arrow::compute::concat_batches` for combining batches
-- Be mindful of memory when working with large datasets
-
-```rust
-// GOOD - Uses Arrow compute kernel
-use arrow::compute::filter;
-let filtered = filter(&batch, &predicate)?;
-
-// AVOID - Manual iteration on Arrow arrays
-let mut result = Vec::new();
-for i in 0..array.len() {
-    if predicate.value(i) {
-        result.push(array.value(i));
-    }
-}
-```
 
 ## Async Code Patterns
 
-### Future Send Bounds
-
-- Ensure async functions return `Send` futures for compatibility with 
multi-threaded executors
-- Flag holding non-Send types (like `Rc`, `RefCell`) across await points
-
-```rust
-// GOOD - Future is Send
-pub async fn read_files(paths: Vec<PathBuf>) -> Result<Vec<RecordBatch>> {
-    let futures = paths.into_iter().map(|p| async move {
-        read_file(&p).await
-    });
-    futures::future::try_join_all(futures).await
-}
-```
-
-### Avoid Blocking in Async Context
-
-- Flag blocking I/O operations in async functions
+- Ensure async functions return `Send` futures (no `Rc`, `RefCell` across 
await points)
+- Flag blocking I/O (`std::fs`, `std::thread::sleep`) in async functions
 - Use `tokio::task::spawn_blocking` for CPU-intensive work
 
-```rust
-// GOOD
-let result = tokio::task::spawn_blocking(move || {
-    expensive_computation(&data)
-}).await?;
-
-// BAD - Blocks the async runtime
-let result = expensive_computation(&data);  // In async function
-```
-
 ## API Design
 
 ### Builder Pattern
@@ -133,7 +43,6 @@ let result = expensive_computation(&data);  // In async 
function
 - Builders should consume `self` and return `Self` for chaining
 
 ```rust
-// GOOD - Follows project convention
 pub struct TableBuilder {
     base_uri: String,
     options: HashMap<String, String>,
@@ -141,12 +50,7 @@ pub struct TableBuilder {
 
 impl TableBuilder {
     pub fn from_base_uri(uri: impl Into<String>) -> Self { ... }
-
-    pub fn with_option(mut self, key: impl Into<String>, value: impl 
Into<String>) -> Self {
-        self.options.insert(key.into(), value.into());
-        self
-    }
-
+    pub fn with_option(mut self, key: impl Into<String>, value: impl 
Into<String>) -> Self { ... }
     pub async fn build(self) -> Result<Table> { ... }
 }
 ```
@@ -156,61 +60,3 @@ impl TableBuilder {
 - All public items must have doc comments
 - Include examples in doc comments for complex APIs
 - Document panics, errors, and safety requirements
-
-```rust
-/// Reads a snapshot of the Hudi table.
-///
-/// # Arguments
-///
-/// * `filters` - Optional partition filters in the format `(column, op, 
value)`
-///
-/// # Errors
-///
-/// Returns an error if:
-/// - The table path is invalid
-/// - The table metadata cannot be read
-/// - Schema parsing fails
-///
-/// # Examples
-///
-/// ```rust
-/// let batches = table.read_snapshot(&[("city", "=", 
"san_francisco")]).await?;
-/// ```
-pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<RecordBatch>> {
-    // ...
-}
-```
-
-## Testing
-
-### Test Organization
-
-- Unit tests in `#[cfg(test)] mod tests` at bottom of file
-- Integration tests in dedicated test modules
-- Use `#[tokio::test]` for async tests
-
-```rust
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[tokio::test]
-    async fn test_read_snapshot_with_filters() {
-        let table = create_test_table().await;
-        let batches = table.read_snapshot(&[("city", "=", 
"test")]).await.unwrap();
-        assert!(!batches.is_empty());
-    }
-
-    #[test]
-    fn test_parse_config_invalid_json() {
-        let result = parse_config("not json");
-        assert!(result.is_err());
-    }
-}
-```
-
-### Test Both Paths
-
-- Test success cases AND error cases
-- Test edge cases (empty input, boundary values)
-- Test that errors contain useful information
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 57569a6..cb2eb60 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -25,6 +25,8 @@ header:
     - 'NOTICE'
     - '**/data/**'
     - '.github/PULL_REQUEST_TEMPLATE.md'
+    - '.github/copilot-instructions.md'
+    - '.github/instructions/**'
     - 'crates/core/schemas/**'
     - 'benchmark/tpch/queries/**'
 
diff --git a/README.md b/README.md
index 7412299..4cdb8e4 100644
--- a/README.md
+++ b/README.md
@@ -206,11 +206,12 @@ record_batch = 
reader.read_file_slice_by_base_file_path("relative/path.parquet")
 
 #### Rust
 
-```rust
+```rust,ignore
 use hudi::file_group::reader::FileGroupReader;
 
+// Inside an async context
 let reader = FileGroupReader::new_with_options(
-    "/table/base/path", [("hoodie.read.file_group.start_timestamp", "0")])?;
+    "/table/base/path", [("hoodie.read.file_group.start_timestamp", 
"0")]).await?;
 
 // Returns an Arrow RecordBatch
 let record_batch = 
reader.read_file_slice_by_base_file_path("relative/path.parquet").await?;
@@ -223,6 +224,7 @@ let record_batch = 
reader.read_file_slice_by_base_file_path("relative/path.parqu
 #include "src/lib.rs.h"
 #include "arrow/c/abi.h"
 
+// Functions may throw rust::Error on failure
 auto reader = new_file_group_reader_with_options(
     "/table/base/path", {"hoodie.read.file_group.start_timestamp=0"});
 
diff --git a/cpp/Cargo.toml b/cpp/Cargo.toml
index 7501a6b..00af632 100644
--- a/cpp/Cargo.toml
+++ b/cpp/Cargo.toml
@@ -37,6 +37,7 @@ hudi-test = { path = "../crates/test", version = "0.5.0-dev" }
 cxx = { version = "1.0" }
 arrow = { workspace = true , features = ["ffi"]}
 arrow-array = { workspace = true , features = ["ffi"]}
+tokio = { workspace = true }
 
 [build-dependencies]
 cxx-build = { version = "1.0" }
diff --git a/cpp/src/lib.rs b/cpp/src/lib.rs
index 68eb579..eddf677 100644
--- a/cpp/src/lib.rs
+++ b/cpp/src/lib.rs
@@ -37,81 +37,95 @@ mod ffi {
         fn new_file_group_reader_with_options(
             base_uri: &CxxString,
             options: &CxxVector<CxxString>,
-        ) -> Box<HudiFileGroupReader>;
+        ) -> Result<Box<HudiFileGroupReader>>;
 
         type HudiFileSlice;
         fn new_file_slice_from_file_names(
             partition_path: &CxxString,
             base_file_name: &CxxString,
             log_file_names: &CxxVector<CxxString>,
-        ) -> Box<HudiFileSlice>;
+        ) -> Result<Box<HudiFileSlice>>;
 
         fn read_file_slice_by_base_file_path(
             self: &HudiFileGroupReader,
             relative_path: &CxxString,
-        ) -> *mut ArrowArrayStream;
+        ) -> Result<*mut ArrowArrayStream>;
 
         fn read_file_slice(
             self: &HudiFileGroupReader,
             file_slice: &HudiFileSlice,
-        ) -> *mut ArrowArrayStream;
+        ) -> Result<*mut ArrowArrayStream>;
     }
 }
 
 pub struct HudiFileGroupReader {
     inner: FileGroupReader,
+    rt: tokio::runtime::Runtime,
 }
 
 pub fn new_file_group_reader_with_options(
     base_uri: &CxxString,
     options: &CxxVector<CxxString>,
-) -> Box<HudiFileGroupReader> {
+) -> std::result::Result<Box<HudiFileGroupReader>, String> {
     let base_uri = base_uri
         .to_str()
-        .expect("Failed to convert CxxString to str: Invalid UTF-8 sequence");
+        .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
 
     let mut opt_vec = Vec::new();
     for opt in options.iter() {
         let opt_str = opt
             .to_str()
-            .expect("Failed to convert CxxString to str: Invalid UTF-8 
sequence");
+            .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
         if let Some((key, value)) = opt_str.split_once('=') {
             opt_vec.push((key, value))
         }
     }
 
-    let reader = FileGroupReader::new_with_options(base_uri, opt_vec)
-        .expect("Failed to create FileGroupReader with options");
-    let reader_wrapper = HudiFileGroupReader { inner: reader };
-    Box::new(reader_wrapper)
+    let rt = tokio::runtime::Builder::new_current_thread()
+        .enable_all()
+        .build()
+        .map_err(|e| format!("Failed to create tokio runtime: {e}"))?;
+    let reader = rt
+        .block_on(FileGroupReader::new_with_options(base_uri, opt_vec))
+        .map_err(|e| format!("Failed to create FileGroupReader: {e}"))?;
+    Ok(Box::new(HudiFileGroupReader { inner: reader, rt }))
 }
 
 impl HudiFileGroupReader {
     pub fn read_file_slice_by_base_file_path(
         &self,
         relative_path: &CxxString,
-    ) -> *mut ffi::ArrowArrayStream {
+    ) -> std::result::Result<*mut ffi::ArrowArrayStream, String> {
         let relative_path = relative_path
             .to_str()
-            .expect("Failed to convert CxxString to str: Invalid UTF-8 
sequence");
+            .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
 
         let record_batch = self
-            .inner
-            .read_file_slice_by_base_file_path_blocking(relative_path)
-            .expect("Failed to read file batch");
+            .rt
+            
.block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
+            .map_err(|e| format!("Failed to read file batch: {e}"))?;
         let schema = record_batch.schema();
 
-        create_raw_pointer_for_record_batches(vec![record_batch], schema)
+        Ok(create_raw_pointer_for_record_batches(
+            vec![record_batch],
+            schema,
+        ))
     }
 
-    pub fn read_file_slice(&self, file_slice: &HudiFileSlice) -> *mut 
ffi::ArrowArrayStream {
+    pub fn read_file_slice(
+        &self,
+        file_slice: &HudiFileSlice,
+    ) -> std::result::Result<*mut ffi::ArrowArrayStream, String> {
         let record_batch = self
-            .inner
-            .read_file_slice_blocking(&file_slice.inner)
-            .expect("Failed to read file slice");
+            .rt
+            .block_on(self.inner.read_file_slice(&file_slice.inner))
+            .map_err(|e| format!("Failed to read file slice: {e}"))?;
         let schema = record_batch.schema();
 
-        create_raw_pointer_for_record_batches(vec![record_batch], schema)
+        Ok(create_raw_pointer_for_record_batches(
+            vec![record_batch],
+            schema,
+        ))
     }
 }
 
@@ -123,38 +137,35 @@ pub fn new_file_slice_from_file_names(
     partition_path: &CxxString,
     base_file_name: &CxxString,
     log_file_names: &CxxVector<CxxString>,
-) -> Box<HudiFileSlice> {
+) -> std::result::Result<Box<HudiFileSlice>, String> {
     let partition_path = partition_path
         .to_str()
-        .expect("Failed to convert CxxString to str: Invalid UTF-8 sequence");
+        .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
     let base_file_name = base_file_name
         .to_str()
-        .expect("Failed to convert CxxString to str: Invalid UTF-8 sequence");
+        .map_err(|e| format!("Failed to convert CxxString to str: {e}"))?;
 
     let log_file_names = log_file_names
         .iter()
         .map(|name| {
             name.to_str()
-                .expect("Failed to convert CxxString to str: Invalid UTF-8 
sequence")
+                .map_err(|e| format!("Failed to convert CxxString to str: 
{e}"))
         })
-        .collect::<Vec<_>>();
+        .collect::<std::result::Result<Vec<_>, _>>()?;
 
     let mut file_group = FileGroup::new_with_base_file_name(base_file_name, 
partition_path)
-        .expect("Failed to create FileGroup");
+        .map_err(|e| format!("Failed to create FileGroup: {e}"))?;
     file_group
         .add_log_files_from_names(&log_file_names)
-        .expect("Failed to add files to FileGroup");
+        .map_err(|e| format!("Failed to add files to FileGroup: {e}"))?;
 
     let (_, file_slice) = file_group
         .file_slices
         .iter()
         .next()
-        .expect("Failed to get file slice from FileGroup");
+        .ok_or_else(|| format!("Failed to get file slice from FileGroup: 
{file_group:?}"))?;
 
-    // todo: add api to create file slice from names to avoid cloning
-    let file_slice_wrapper = HudiFileSlice {
+    Ok(Box::new(HudiFileSlice {
         inner: file_slice.clone(),
-    };
-
-    Box::new(file_slice_wrapper)
+    }))
 }
diff --git a/crates/core/src/config/internal.rs 
b/crates/core/src/config/internal.rs
index 965df95..9f1b7cb 100644
--- a/crates/core/src/config/internal.rs
+++ b/crates/core/src/config/internal.rs
@@ -36,8 +36,11 @@ use crate::config::{ConfigParser, HudiConfigValue};
 /// use hudi_core::config::internal::HudiInternalConfig::SkipConfigValidation;
 /// use hudi_core::table::Table as HudiTable;
 ///
+/// # #[tokio::main]
+/// # async fn main() {
 /// let options = [(SkipConfigValidation, "true")];
-/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
+/// HudiTable::new_with_options("/tmp/hudi_data", options).await;
+/// # }
 /// ```
 ///
 #[derive(Clone, Debug, PartialEq, Eq, Hash, EnumIter)]
diff --git a/crates/core/src/config/read.rs b/crates/core/src/config/read.rs
index a5acbc5..9352702 100644
--- a/crates/core/src/config/read.rs
+++ b/crates/core/src/config/read.rs
@@ -36,8 +36,11 @@ use crate::config::{ConfigParser, HudiConfigValue};
 /// use hudi_core::config::read::HudiReadConfig::InputPartitions;
 /// use hudi_core::table::Table as HudiTable;
 ///
+/// # #[tokio::main]
+/// # async fn main() {
 /// let options = [(InputPartitions, "2")];
-/// HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
+/// HudiTable::new_with_options("/tmp/hudi_data", options).await;
+/// # }
 /// ```
 ///
 
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 8de9d86..1031a39 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -87,27 +87,21 @@ impl FileGroupReader {
     ///
     /// # Notes
     /// This API uses [`OptionResolver`] that loads table properties from 
storage to resolve options.
-    pub fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
+    pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
     where
         I: IntoIterator<Item = (K, V)>,
         K: AsRef<str>,
         V: Into<String>,
     {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async {
-                let mut resolver = OptionResolver::new_with_options(base_uri, 
options);
-                resolver.resolve_options().await?;
-                let hudi_configs = 
Arc::new(HudiConfigs::new(resolver.hudi_options));
-                let storage =
-                    Storage::new(Arc::new(resolver.storage_options), 
hudi_configs.clone())?;
-
-                Ok(Self {
-                    hudi_configs,
-                    storage,
-                })
-            })
+        let mut resolver = OptionResolver::new_with_options(base_uri, options);
+        resolver.resolve_options().await?;
+        let hudi_configs = Arc::new(HudiConfigs::new(resolver.hudi_options));
+        let storage = Storage::new(Arc::new(resolver.storage_options), 
hudi_configs.clone())?;
+
+        Ok(Self {
+            hudi_configs,
+            storage,
+        })
     }
 
     /// Reads the data from the base file at the given relative path.
@@ -130,17 +124,6 @@ impl FileGroupReader {
         apply_commit_time_filter(&self.hudi_configs, records)
     }
 
-    /// Same as [FileGroupReader::read_file_slice_by_base_file_path], but 
blocking.
-    pub fn read_file_slice_by_base_file_path_blocking(
-        &self,
-        relative_path: &str,
-    ) -> Result<RecordBatch> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(self.read_file_slice_by_base_file_path(relative_path))
-    }
-
     fn create_instant_range_for_log_file_scan(&self) -> InstantRange {
         let timezone = self
             .hudi_configs
@@ -179,14 +162,6 @@ impl FileGroupReader {
             .await
     }
 
-    /// Same as [FileGroupReader::read_file_slice], but blocking.
-    pub fn read_file_slice_blocking(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(self.read_file_slice(file_slice))
-    }
-
     /// Reads a file slice from a base file and a list of log files.
     ///
     /// # Arguments
@@ -248,22 +223,6 @@ impl FileGroupReader {
         }
     }
 
-    /// Same as [FileGroupReader::read_file_slice_from_paths], but blocking.
-    pub fn read_file_slice_from_paths_blocking<I, S>(
-        &self,
-        base_file_path: &str,
-        log_file_paths: I,
-    ) -> Result<RecordBatch>
-    where
-        I: IntoIterator<Item = S>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(self.read_file_slice_from_paths(base_file_path, 
log_file_paths))
-    }
-
     // 
=========================================================================
     // Streaming Read APIs
     // 
=========================================================================
@@ -694,11 +653,13 @@ mod tests {
         url.as_ref().to_string()
     }
 
-    #[test]
-    fn test_new_with_options() {
+    #[tokio::test]
+    async fn test_new_with_options() {
         let options = vec![("key1", "value1"), ("key2", "value2")];
         let base_uri = get_base_uri_with_valid_props();
-        let reader = FileGroupReader::new_with_options(&base_uri, 
options).unwrap();
+        let reader = FileGroupReader::new_with_options(&base_uri, options)
+            .await
+            .unwrap();
         assert!(!reader.storage.options.is_empty());
         assert!(
             reader
@@ -708,14 +669,14 @@ mod tests {
         );
     }
 
-    #[test]
-    fn test_new_with_options_invalid_base_uri_or_invalid_props() {
+    #[tokio::test]
+    async fn test_new_with_options_invalid_base_uri_or_invalid_props() {
         let base_uri = get_non_existent_base_uri();
-        let result = FileGroupReader::new_with_options(&base_uri, 
empty_options());
+        let result = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await;
         assert!(result.is_err());
 
         let base_uri = get_base_uri_with_invalid_props();
-        let result = FileGroupReader::new_with_options(&base_uri, 
empty_options());
+        let result = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await;
         assert!(result.is_err())
     }
 
@@ -736,8 +697,8 @@ mod tests {
         RecordBatch::try_new(schema, vec![commit_times, names, 
ages]).map_err(CoreError::ArrowError)
     }
 
-    #[test]
-    fn test_create_commit_time_filter_mask() -> Result<()> {
+    #[tokio::test]
+    async fn test_create_commit_time_filter_mask() -> Result<()> {
         let base_uri = get_base_uri_with_valid_props_minimum();
         let records = create_test_record_batch()?;
 
@@ -748,12 +709,13 @@ mod tests {
                 (HudiTableConfig::PopulatesMetaFields.as_ref(), "false"),
                 (HudiReadConfig::FileGroupStartTimestamp.as_ref(), "2"),
             ],
-        )?;
+        )
+        .await?;
         let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(mask, None, "Commit time filtering should not be needed");
 
         // Test case 2: No commit time filtering options
-        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await?;
         let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(mask, None);
 
@@ -761,7 +723,8 @@ mod tests {
         let reader = FileGroupReader::new_with_options(
             &base_uri,
             [(HudiReadConfig::FileGroupStartTimestamp, "2")],
-        )?;
+        )
+        .await?;
         let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(
             mask,
@@ -773,7 +736,8 @@ mod tests {
         let reader = FileGroupReader::new_with_options(
             &base_uri,
             [(HudiReadConfig::FileGroupEndTimestamp, "4")],
-        )?;
+        )
+        .await?;
         let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(mask, None, "Commit time filtering should not be needed");
 
@@ -784,7 +748,8 @@ mod tests {
                 (HudiReadConfig::FileGroupStartTimestamp, "2"),
                 (HudiReadConfig::FileGroupEndTimestamp, "4"),
             ],
-        )?;
+        )
+        .await?;
         let mask = create_commit_time_filter_mask(&reader.hudi_configs, 
&records)?;
         assert_eq!(
             mask,
@@ -795,16 +760,18 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_read_file_slice_from_paths_with_base_file_only() -> Result<()> {
+    #[tokio::test]
+    async fn test_read_file_slice_from_paths_with_base_file_only() -> 
Result<()> {
         let base_uri = get_base_uri_with_valid_props_minimum();
-        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await?;
 
         // Test with actual test files and empty log files - should trigger 
base_file_only logic
         let base_file_path = TEST_SAMPLE_BASE_FILE;
         let log_file_paths: Vec<&str> = vec![];
 
-        let result = 
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+        let result = reader
+            .read_file_slice_from_paths(base_file_path, log_file_paths)
+            .await;
 
         match result {
             Ok(batch) => {
@@ -821,18 +788,21 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_read_file_slice_from_paths_read_optimized_mode() -> Result<()> {
+    #[tokio::test]
+    async fn test_read_file_slice_from_paths_read_optimized_mode() -> 
Result<()> {
         let base_uri = get_base_uri_with_valid_props_minimum();
         let reader = FileGroupReader::new_with_options(
             &base_uri,
             [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
-        )?;
+        )
+        .await?;
 
         let base_file_path = TEST_SAMPLE_BASE_FILE;
         let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
 
-        let result = 
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+        let result = reader
+            .read_file_slice_from_paths(base_file_path, log_file_paths)
+            .await;
 
         // In read-optimized mode, log files should be ignored
         // This should behave the same as read_file_slice_by_base_file_path
@@ -853,15 +823,17 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_read_file_slice_from_paths_with_log_files() -> Result<()> {
+    #[tokio::test]
+    async fn test_read_file_slice_from_paths_with_log_files() -> Result<()> {
         let base_uri = get_base_uri_with_valid_props_minimum();
-        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await?;
 
         let base_file_path = TEST_SAMPLE_BASE_FILE;
         let log_file_paths = vec![TEST_SAMPLE_LOG_FILE.to_string()];
 
-        let result = 
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+        let result = reader
+            .read_file_slice_from_paths(base_file_path, log_file_paths)
+            .await;
 
         // The actual file reading might fail due to missing test data, which 
is expected
         match result {
@@ -881,16 +853,18 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_read_file_slice_from_paths_error_handling() -> Result<()> {
+    #[tokio::test]
+    async fn test_read_file_slice_from_paths_error_handling() -> Result<()> {
         let base_uri = get_base_uri_with_valid_props_minimum();
-        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await?;
 
         // Test with non-existent base file
         let base_file_path = "non_existent_file.parquet";
         let log_file_paths: Vec<&str> = vec![];
 
-        let result = 
reader.read_file_slice_from_paths_blocking(base_file_path, log_file_paths);
+        let result = reader
+            .read_file_slice_from_paths(base_file_path, log_file_paths)
+            .await;
 
         assert!(result.is_err(), "Should return error for non-existent file");
 
@@ -903,17 +877,17 @@ mod tests {
         Ok(())
     }
 
-    #[test]
-    fn test_read_file_slice_blocking() -> Result<()> {
+    #[tokio::test]
+    async fn test_read_file_slice() -> Result<()> {
         let base_uri = get_base_uri_with_valid_props_minimum();
-        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await?;
 
         // Create a FileSlice from the test sample base file
         let base_file = BaseFile::from_str(TEST_SAMPLE_BASE_FILE)?;
         let file_slice = FileSlice::new(base_file, String::new()); // empty 
partition path
 
-        // Call read_file_slice_blocking
-        let result = reader.read_file_slice_blocking(&file_slice);
+        // Call read_file_slice
+        let result = reader.read_file_slice(&file_slice).await;
 
         match result {
             Ok(batch) => {
@@ -1279,11 +1253,11 @@ mod tests {
         
FileGroupReader::new_with_configs_and_overwriting_options(hudi_configs, 
empty_options())
     }
 
-    #[test]
-    fn test_is_metadata_table_detection() -> Result<()> {
+    #[tokio::test]
+    async fn test_is_metadata_table_detection() -> Result<()> {
         // Regular table should return false
         let base_uri = get_base_uri_with_valid_props();
-        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options())?;
+        let reader = FileGroupReader::new_with_options(&base_uri, 
empty_options()).await?;
         assert!(!reader.is_metadata_table());
 
         // Metadata table should return true
diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs
index 24deae4..fbbe9d9 100644
--- a/crates/core/src/lib.rs
+++ b/crates/core/src/lib.rs
@@ -26,8 +26,11 @@
 //! use hudi_core::config::read::HudiReadConfig::InputPartitions;
 //! use hudi_core::table::Table as HudiTable;
 //!
+//! # #[tokio::main]
+//! # async fn main() {
 //! let options = [(InputPartitions, "2")];
-//! HudiTable::new_with_options_blocking("/tmp/hudi_data", options);
+//! HudiTable::new_with_options("/tmp/hudi_data", options).await;
+//! # }
 //! ```
 //!
 //! # The [table] module is responsible for managing Hudi tables.
diff --git a/crates/core/src/metadata/table/mod.rs 
b/crates/core/src/metadata/table/mod.rs
index e5e211c..7a2d3e6 100644
--- a/crates/core/src/metadata/table/mod.rs
+++ b/crates/core/src/metadata/table/mod.rs
@@ -140,14 +140,6 @@ impl Table {
         .await
     }
 
-    /// Same as [Table::new_metadata_table], but blocking.
-    pub fn new_metadata_table_blocking(&self) -> Result<Table> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.new_metadata_table().await })
-    }
-
     /// Fetch records from the `files` partition of metadata table
     /// with optional data table partition pruning.
     ///
@@ -167,17 +159,6 @@ impl Table {
             .await
     }
 
-    /// Same as [Table::read_metadata_table_files_partition], but blocking.
-    pub fn read_metadata_table_files_partition_blocking(
-        &self,
-        partition_pruner: &PartitionPruner,
-    ) -> Result<HashMap<String, FilesPartitionRecord>> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            
.block_on(self.read_metadata_table_files_partition(partition_pruner))
-    }
-
     /// Fetch records from the `files` partition with optional partition 
pruning.
     ///
     /// For non-partitioned tables, directly fetches the "." record.
@@ -297,20 +278,21 @@ mod tests {
     use records::{FilesPartitionRecord, MetadataRecordType};
     use std::collections::HashSet;
 
-    fn get_data_table() -> Table {
+    async fn get_data_table() -> Table {
         let table_path = 
QuickstartTripsTable::V8Trips8I3U1D.path_to_mor_avro();
-        Table::new_blocking(&table_path).unwrap()
+        Table::new(&table_path).await.unwrap()
     }
 
-    #[test]
-    fn hudi_table_read_metadata_table_files_partition() {
-        let data_table = get_data_table();
-        let partition_schema = 
data_table.get_partition_schema_blocking().unwrap();
+    #[tokio::test]
+    async fn hudi_table_read_metadata_table_files_partition() {
+        let data_table = get_data_table().await;
+        let partition_schema = 
data_table.get_partition_schema().await.unwrap();
         let partition_pruner =
             PartitionPruner::new(&[], &partition_schema, 
data_table.hudi_configs.as_ref()).unwrap();
 
         let records = data_table
-            .read_metadata_table_files_partition_blocking(&partition_pruner)
+            .read_metadata_table_files_partition(&partition_pruner)
+            .await
             .unwrap();
 
         // Should have 4 records: __all_partitions__ + 3 city partitions
@@ -346,9 +328,9 @@ mod tests {
         assert!(chennai.total_size() > 0);
     }
 
-    #[test]
-    fn hudi_table_get_metadata_table_partitions() {
-        let data_table = get_data_table();
+    #[tokio::test]
+    async fn hudi_table_get_metadata_table_partitions() {
+        let data_table = get_data_table().await;
 
         // Verify we can get the metadata table partitions from the data table
         let partitions = data_table.get_metadata_table_partitions();
@@ -376,11 +358,11 @@ mod tests {
         }
     }
 
-    #[test]
-    fn hudi_table_is_metadata_table_enabled() {
+    #[tokio::test]
+    async fn hudi_table_is_metadata_table_enabled() {
         // V8 table with files partition configured should enable metadata 
table
         // even without explicit hoodie.metadata.enable=true
-        let data_table = get_data_table();
+        let data_table = get_data_table().await;
 
         // Verify it's a v8 table
         let table_version: isize = data_table
@@ -404,11 +386,11 @@ mod tests {
         );
     }
 
-    #[test]
-    fn hudi_table_v6_metadata_table_not_enabled() {
+    #[tokio::test]
+    async fn hudi_table_v6_metadata_table_not_enabled() {
         // V6 tables should NOT have metadata table enabled, even with 
explicit setting
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
 
         // Verify it's a v6 table
         let table_version: isize = hudi_table
@@ -425,35 +407,35 @@ mod tests {
         );
     }
 
-    #[test]
-    fn hudi_table_is_not_metadata_table() {
+    #[tokio::test]
+    async fn hudi_table_is_not_metadata_table() {
         // A regular data table should not be a metadata table
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert!(
             !hudi_table.is_metadata_table(),
             "Regular data table should not be a metadata table"
         );
     }
 
-    #[test]
-    fn hudi_table_metadata_table_is_metadata_table() {
+    #[tokio::test]
+    async fn hudi_table_metadata_table_is_metadata_table() {
         // Create a metadata table and verify it's recognized as such
-        let data_table = get_data_table();
-        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+        let data_table = get_data_table().await;
+        let metadata_table = data_table.new_metadata_table().await.unwrap();
         assert!(
             metadata_table.is_metadata_table(),
             "Metadata table should be recognized as a metadata table"
         );
     }
 
-    #[test]
-    fn hudi_table_new_metadata_table_from_metadata_table_errors() {
+    #[tokio::test]
+    async fn hudi_table_new_metadata_table_from_metadata_table_errors() {
         // Trying to create a metadata table from a metadata table should fail
-        let data_table = get_data_table();
-        let metadata_table = data_table.new_metadata_table_blocking().unwrap();
+        let data_table = get_data_table().await;
+        let metadata_table = data_table.new_metadata_table().await.unwrap();
 
-        let result = metadata_table.new_metadata_table_blocking();
+        let result = metadata_table.new_metadata_table().await;
         assert!(result.is_err());
         let err = result.unwrap_err();
         assert!(
diff --git a/crates/core/src/schema/mod.rs b/crates/core/src/schema/mod.rs
index 52ce19a..e351581 100644
--- a/crates/core/src/schema/mod.rs
+++ b/crates/core/src/schema/mod.rs
@@ -18,7 +18,9 @@
  */
 use crate::error::{CoreError, Result};
 use crate::metadata::meta_field::MetaField;
+use crate::schema::resolver::sanitize_avro_schema_str;
 use arrow_schema::{Schema, SchemaRef};
+use serde_json::Value;
 
 pub mod delete;
 pub mod resolver;
@@ -36,6 +38,48 @@ pub fn prepend_meta_fields_with_operation(schema: SchemaRef) 
-> Result<Schema> {
         .map_err(CoreError::ArrowError)
 }
 
+pub fn prepend_meta_fields_to_avro_schema_str(avro_schema_str: &str) -> 
Result<String> {
+    let mut schema: Value = 
serde_json::from_str(&sanitize_avro_schema_str(avro_schema_str))
+        .map_err(|e| CoreError::Schema(format!("Failed to parse Avro schema 
JSON: {e}")))?;
+
+    let fields = schema
+        .get_mut("fields")
+        .and_then(|f| f.as_array_mut())
+        .ok_or_else(|| CoreError::Schema("Avro schema has no 'fields' 
array".to_string()))?;
+
+    let meta_field_defs: Vec<Value> = MetaField::field_names()
+        .iter()
+        .map(|name| {
+            serde_json::json!({
+                "name": name,
+                "type": ["null", "string"],
+                "default": null
+            })
+        })
+        .collect();
+
+    let existing_names: std::collections::HashSet<&str> = fields
+        .iter()
+        .filter_map(|f| f.get("name").and_then(|n| n.as_str()))
+        .collect();
+
+    let new_meta_fields: Vec<Value> = meta_field_defs
+        .into_iter()
+        .filter(|f| {
+            f.get("name")
+                .and_then(|n| n.as_str())
+                .is_none_or(|name| !existing_names.contains(name))
+        })
+        .collect();
+
+    let mut all_fields = new_meta_fields;
+    all_fields.append(fields);
+    *fields = all_fields;
+
+    serde_json::to_string(&schema)
+        .map_err(|e| CoreError::Schema(format!("Failed to serialize Avro 
schema: {e}")))
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -62,4 +106,35 @@ mod tests {
             [MetaField::field_names_with_operation(), vec!["field1"]].concat()
         )
     }
+
+    #[test]
+    fn test_prepend_meta_fields_to_avro_schema_str() {
+        let avro_schema =
+            
r#"{"type":"record","name":"TestRecord","fields":[{"name":"id","type":"int"}]}"#;
+        let result = 
prepend_meta_fields_to_avro_schema_str(avro_schema).unwrap();
+        let parsed: Value = serde_json::from_str(&result).unwrap();
+        let fields = parsed["fields"].as_array().unwrap();
+        assert_eq!(fields.len(), 6, "Expected 5 meta fields + 1 data field");
+        assert_eq!(fields[0]["name"], "_hoodie_commit_time");
+        assert_eq!(fields[5]["name"], "id");
+    }
+
+    #[test]
+    fn test_prepend_meta_fields_to_avro_schema_str_dedup() {
+        let avro_schema = 
r#"{"type":"record","name":"TestRecord","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"default":null},{"name":"id","type":"int"}]}"#;
+        let result = 
prepend_meta_fields_to_avro_schema_str(avro_schema).unwrap();
+        let parsed: Value = serde_json::from_str(&result).unwrap();
+        let fields = parsed["fields"].as_array().unwrap();
+        assert_eq!(
+            fields.len(),
+            6,
+            "Expected 5 meta fields + 1 data field (deduped existing meta 
field)"
+        );
+        // The existing _hoodie_commit_time should not be duplicated
+        let commit_time_count = fields
+            .iter()
+            .filter(|f| f["name"] == "_hoodie_commit_time")
+            .count();
+        assert_eq!(commit_time_count, 1);
+    }
 }
diff --git a/crates/core/src/schema/resolver.rs 
b/crates/core/src/schema/resolver.rs
index b1fcd2e..306acec 100644
--- a/crates/core/src/schema/resolver.rs
+++ b/crates/core/src/schema/resolver.rs
@@ -20,7 +20,7 @@ use crate::avro_to_arrow::to_arrow_schema;
 use crate::config::table::HudiTableConfig;
 use crate::error::{CoreError, Result};
 use crate::metadata::commit::HoodieCommitMetadata;
-use crate::schema::prepend_meta_fields;
+use crate::schema::{prepend_meta_fields, 
prepend_meta_fields_to_avro_schema_str};
 use crate::storage::Storage;
 use crate::table::Table;
 use apache_avro::schema::Schema as AvroSchema;
@@ -30,62 +30,72 @@ use std::path::PathBuf;
 use std::str::FromStr;
 use std::sync::Arc;
 
-/// Resolves the [`arrow_schema::Schema`] for a given Hudi table.
+/// Resolves the data [`arrow_schema::Schema`] for a given Hudi table, without 
Hudi meta fields.
 ///
 /// The resolution process follows these steps:
-/// - If the timeline has commit metadata, read the schema field from it.
-///   - If the commit metadata has no schema, read the schema from the base 
file pointed by the first entry in the write-status of the commit metadata.
-/// - If the timeline has no commit metadata, read 
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
-pub async fn resolve_schema(table: &Table) -> Result<Schema> {
-    let timeline = table.get_timeline();
-    match timeline.get_latest_commit_metadata().await {
-        Ok(metadata) => {
-            resolve_schema_from_commit_metadata(&metadata, 
timeline.storage.clone()).await
-        }
-        Err(CoreError::TimelineNoCommit) => {
-            if let Some(create_schema) = 
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
-                let avro_schema_str: String = create_schema.into();
-                let arrow_schema = 
arrow_schema_from_avro_schema_str(&avro_schema_str)?;
-                prepend_meta_fields(SchemaRef::new(arrow_schema))
-            } else {
-                Err(CoreError::SchemaNotFound(
-                    "No completed commit, and no create schema for the 
table.".to_string(),
-                ))
-            }
-        }
+/// 1. Try to get the schema from the timeline (commit metadata or base file).
+/// 2. Fall back to [`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+pub async fn resolve_data_schema(table: &Table) -> Result<Schema> {
+    match table.get_timeline().get_latest_schema().await {
+        Ok(schema) => Ok(schema),
+        Err(CoreError::TimelineNoCommit) => 
resolve_data_schema_from_create_schema(table),
         Err(e) => Err(e),
     }
 }
 
+/// Resolves the [`arrow_schema::Schema`] for a given Hudi table, with Hudi 
meta fields prepended.
+pub async fn resolve_schema(table: &Table) -> Result<Schema> {
+    let data_schema = resolve_data_schema(table).await?;
+    prepend_meta_fields(SchemaRef::new(data_schema))
+}
+
 /// Resolves the [`apache_avro::schema::Schema`] as a [`String`] for a given 
Hudi table.
 ///
 /// The resolution process follows these steps:
-/// - If the timeline has commit metadata, read the schema field from it.
-/// - If the timeline has no commit metadata, read 
[`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
+/// 1. Try to get the Avro schema from the timeline (commit metadata).
+/// 2. Fall back to [`HudiTableConfig::CreateSchema`] from `hoodie.properties`.
 ///
 /// ### Note
 ///
 /// - For resolving Avro schema, we don't read the schema from a base file 
like we do when resolving Arrow schema.
 /// - Avro schema does not contain [`MetaField`]s.
 pub async fn resolve_avro_schema(table: &Table) -> Result<String> {
-    let timeline = table.get_timeline();
-    match timeline.get_latest_commit_metadata().await {
-        Ok(metadata) => resolve_avro_schema_from_commit_metadata(&metadata),
-        Err(CoreError::TimelineNoCommit) => {
-            if let Some(create_schema) = 
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
-                let create_schema: String = create_schema.into();
-                Ok(sanitize_avro_schema_str(&create_schema))
-            } else {
-                Err(CoreError::SchemaNotFound(
-                    "No completed commit, and no create schema for the 
table.".to_string(),
-                ))
-            }
-        }
+    match table.get_timeline().get_latest_avro_schema().await {
+        Ok(schema) => Ok(schema),
+        Err(CoreError::TimelineNoCommit) => 
resolve_avro_schema_from_create_schema(table),
         Err(e) => Err(e),
     }
 }
 
-pub(crate) async fn resolve_schema_from_commit_metadata(
+/// Same as [`resolve_avro_schema`] but with Hudi meta fields prepended to the 
schema.
+pub async fn resolve_avro_schema_with_meta_fields(table: &Table) -> 
Result<String> {
+    let avro_schema_str = resolve_avro_schema(table).await?;
+    prepend_meta_fields_to_avro_schema_str(&avro_schema_str)
+}
+
+fn resolve_data_schema_from_create_schema(table: &Table) -> Result<Schema> {
+    if let Some(create_schema) = 
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+        let avro_schema_str: String = create_schema.into();
+        arrow_schema_from_avro_schema_str(&avro_schema_str)
+    } else {
+        Err(CoreError::SchemaNotFound(
+            "No completed commit, and no create schema for the 
table.".to_string(),
+        ))
+    }
+}
+
+fn resolve_avro_schema_from_create_schema(table: &Table) -> Result<String> {
+    if let Some(create_schema) = 
table.hudi_configs.try_get(HudiTableConfig::CreateSchema) {
+        let create_schema: String = create_schema.into();
+        Ok(sanitize_avro_schema_str(&create_schema))
+    } else {
+        Err(CoreError::SchemaNotFound(
+            "No completed commit, and no create schema for the 
table.".to_string(),
+        ))
+    }
+}
+
+pub(crate) async fn resolve_data_schema_from_commit_metadata(
     commit_metadata: &Map<String, Value>,
     storage: Arc<Storage>,
 ) -> Result<Schema> {
@@ -97,8 +107,7 @@ pub(crate) async fn resolve_schema_from_commit_metadata(
         Err(e) => return Err(e),
     };
 
-    let arrow_schema = arrow_schema_from_avro_schema_str(&avro_schema_str)?;
-    prepend_meta_fields(SchemaRef::new(arrow_schema))
+    arrow_schema_from_avro_schema_str(&avro_schema_str)
 }
 
 pub(crate) fn resolve_avro_schema_from_commit_metadata(
@@ -158,7 +167,7 @@ async fn resolve_schema_from_base_file(
     ))
 }
 
-fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
+pub(crate) fn sanitize_avro_schema_str(avro_schema_str: &str) -> String {
     avro_schema_str.trim().replace("\\:", ":")
 }
 
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index c151a63..e861ed1 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -106,7 +106,9 @@ use crate::expr::filter::{Filter, from_str_tuples};
 use crate::file_group::file_slice::FileSlice;
 use crate::file_group::reader::FileGroupReader;
 use crate::metadata::METADATA_TABLE_PARTITION_FIELD;
-use crate::schema::resolver::{resolve_avro_schema, resolve_schema};
+use crate::schema::resolver::{
+    resolve_avro_schema, resolve_avro_schema_with_meta_fields, 
resolve_data_schema, resolve_schema,
+};
 use crate::table::builder::TableBuilder;
 use crate::table::file_pruner::FilePruner;
 use crate::table::fs_view::FileSystemView;
@@ -135,14 +137,6 @@ impl Table {
         TableBuilder::from_base_uri(base_uri).build().await
     }
 
-    /// Same as [Table::new], but blocking.
-    pub fn new_blocking(base_uri: &str) -> Result<Self> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { Table::new(base_uri).await })
-    }
-
     /// Create hudi table with options
     pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
     where
@@ -156,19 +150,6 @@ impl Table {
             .await
     }
 
-    /// Same as [Table::new_with_options], but blocking.
-    pub fn new_with_options_blocking<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
-    where
-        I: IntoIterator<Item = (K, V)>,
-        K: AsRef<str>,
-        V: Into<String>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { Table::new_with_options(base_uri, options).await 
})
-    }
-
     pub fn hudi_options(&self) -> HashMap<String, String> {
         self.hudi_configs.as_options()
     }
@@ -225,44 +206,52 @@ impl Table {
             .into()
     }
 
-    /// Get the latest Avro schema string of the table.
+    /// Get the latest Avro schema string of the table, without Hudi meta 
fields (`_hoodie_*`).
     ///
     /// The implementation looks for the schema in the following order:
     /// 1. Timeline commit metadata.
     /// 2. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
-    ///
-    /// ### Note
-    ///
-    /// The schema returned does not contain Hudi's [MetaField]s,
-    /// which is different from the one returned by [Table::get_schema].
-    pub async fn get_avro_schema(&self) -> Result<String> {
-        resolve_avro_schema(self).await
+    pub async fn get_schema_in_avro_str(&self) -> Result<String> {
+        self.get_schema_in_avro_str_internal(false).await
+    }
+
+    /// Get the latest Avro schema string of the table, with Hudi meta fields 
(`_hoodie_*`)
+    /// prepended.
+    pub async fn get_schema_in_avro_str_with_meta_fields(&self) -> 
Result<String> {
+        self.get_schema_in_avro_str_internal(true).await
     }
 
-    /// Same as [Table::get_avro_schema], but blocking.
-    pub fn get_avro_schema_blocking(&self) -> Result<String> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.get_avro_schema().await })
+    async fn get_schema_in_avro_str_internal(&self, includes_meta_fields: 
bool) -> Result<String> {
+        if includes_meta_fields {
+            resolve_avro_schema_with_meta_fields(self).await
+        } else {
+            resolve_avro_schema(self).await
+        }
     }
 
-    /// Get the latest [arrow_schema::Schema] of the table.
+    /// Get the latest [arrow_schema::Schema] of the table, without Hudi meta 
fields
+    /// (`_hoodie_*`).
     ///
     /// The implementation looks for the schema in the following order:
     /// 1. Timeline commit metadata.
     /// 2. Base file schema.
     /// 3. `hoodie.properties` file's [HudiTableConfig::CreateSchema].
     pub async fn get_schema(&self) -> Result<Schema> {
-        resolve_schema(self).await
+        self.get_schema_internal(false).await
+    }
+
+    /// Get the latest [arrow_schema::Schema] of the table, with Hudi meta 
fields (`_hoodie_*`)
+    /// prepended.
+    pub async fn get_schema_with_meta_fields(&self) -> Result<Schema> {
+        self.get_schema_internal(true).await
     }
 
-    /// Same as [Table::get_schema], but blocking.
-    pub fn get_schema_blocking(&self) -> Result<Schema> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.get_schema().await })
+    async fn get_schema_internal(&self, includes_meta_fields: bool) -> 
Result<Schema> {
+        if includes_meta_fields {
+            resolve_schema(self).await
+        } else {
+            resolve_data_schema(self).await
+        }
     }
 
     /// Get the latest partition [arrow_schema::Schema] of the table.
@@ -298,14 +287,6 @@ impl Table {
         Ok(Schema::new(partition_fields))
     }
 
-    /// Same as [Table::get_partition_schema], but blocking.
-    pub fn get_partition_schema_blocking(&self) -> Result<Schema> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.get_partition_schema().await })
-    }
-
     /// Get the [Timeline] of the table.
     pub fn get_timeline(&self) -> &Timeline {
         &self.timeline
@@ -314,8 +295,8 @@ impl Table {
     /// Get all the [FileSlice]s in splits from the table.
     ///
     /// # Arguments
-    ///     * `num_splits` - The number of chunks to split the file slices 
into.
-    ///     * `filters` - Partition filters to apply.
+    /// * `num_splits` - The number of chunks to split the file slices into.
+    /// * `filters` - Partition filters to apply.
     pub async fn get_file_slices_splits<I, S>(
         &self,
         num_splits: usize,
@@ -334,28 +315,12 @@ impl Table {
         }
     }
 
-    /// Same as [Table::get_file_slices_splits], but blocking.
-    pub fn get_file_slices_splits_blocking<I, S>(
-        &self,
-        num_splits: usize,
-        filters: I,
-    ) -> Result<Vec<Vec<FileSlice>>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.get_file_slices_splits(num_splits, 
filters).await })
-    }
-
     /// Get all the [FileSlice]s in splits from the table at a given timestamp.
     ///
     /// # Arguments
-    ///     * `num_splits` - The number of chunks to split the file slices 
into.
-    ///     * `timestamp` - The timestamp which file slices associated with.
-    ///     * `filters` - Partition filters to apply.
+    /// * `num_splits` - The number of chunks to split the file slices into.
+    /// * `timestamp` - The timestamp which file slices associated with.
+    /// * `filters` - Partition filters to apply.
     pub async fn get_file_slices_splits_as_of<I, S>(
         &self,
         num_splits: usize,
@@ -372,26 +337,6 @@ impl Table {
             .await
     }
 
-    /// Same as [Table::get_file_slices_splits_as_of], but blocking.
-    pub fn get_file_slices_splits_as_of_blocking<I, S>(
-        &self,
-        num_splits: usize,
-        timestamp: &str,
-        filters: I,
-    ) -> Result<Vec<Vec<FileSlice>>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async {
-                self.get_file_slices_splits_as_of(num_splits, timestamp, 
filters)
-                    .await
-            })
-    }
-
     async fn get_file_slices_splits_internal(
         &self,
         num_splits: usize,
@@ -405,10 +350,10 @@ impl Table {
     /// Get all the [FileSlice]s in the table.
     ///
     /// # Arguments
-    ///     * `filters` - Partition filters to apply.
+    /// * `filters` - Partition filters to apply.
     ///
     /// # Notes
-    ///     * This API is useful for implementing snapshot query.
+    /// * This API is useful for implementing snapshot query.
     pub async fn get_file_slices<I, S>(&self, filters: I) -> 
Result<Vec<FileSlice>>
     where
         I: IntoIterator<Item = (S, S, S)>,
@@ -422,26 +367,14 @@ impl Table {
         }
     }
 
-    /// Same as [Table::get_file_slices], but blocking.
-    pub fn get_file_slices_blocking<I, S>(&self, filters: I) -> 
Result<Vec<FileSlice>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.get_file_slices(filters).await })
-    }
-
     /// Get all the [FileSlice]s in the table at a given timestamp.
     ///
     /// # Arguments
-    ///     * `timestamp` - The timestamp which file slices associated with.
-    ///     * `filters` - Partition filters to apply.
+    /// * `timestamp` - The timestamp which file slices associated with.
+    /// * `filters` - Partition filters to apply.
     ///
     /// # Notes
-    ///     * This API is useful for implementing time travel query.
+    /// * This API is useful for implementing time travel query.
     pub async fn get_file_slices_as_of<I, S>(
         &self,
         timestamp: &str,
@@ -456,22 +389,6 @@ impl Table {
         self.get_file_slices_internal(&timestamp, &filters).await
     }
 
-    /// Same as [Table::get_file_slices_as_of], but blocking.
-    pub fn get_file_slices_as_of_blocking<I, S>(
-        &self,
-        timestamp: &str,
-        filters: I,
-    ) -> Result<Vec<FileSlice>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.get_file_slices_as_of(timestamp, 
filters).await })
-    }
-
     async fn get_file_slices_internal(
         &self,
         timestamp: &str,
@@ -519,11 +436,11 @@ impl Table {
     /// Get all the changed [FileSlice]s in the table between the given 
timestamps.
     ///
     /// # Arguments
-    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
-    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    /// * `start_timestamp` - If provided, only file slices that were changed 
after this timestamp will be returned.
+    /// * `end_timestamp` - If provided, only file slices that were changed 
before or at this timestamp will be returned.
     ///
     /// # Notes
-    ///     * This API is useful for implementing incremental query.
+    /// * This API is useful for implementing incremental query.
     pub async fn get_file_slices_between(
         &self,
         start_timestamp: Option<&str>,
@@ -543,31 +460,16 @@ impl Table {
         self.get_file_slices_between_internal(start, end).await
     }
 
-    /// Same as [Table::get_file_slices_between], but blocking.
-    pub fn get_file_slices_between_blocking(
-        &self,
-        start_timestamp: Option<&str>,
-        end_timestamp: Option<&str>,
-    ) -> Result<Vec<FileSlice>> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async {
-                self.get_file_slices_between(start_timestamp, end_timestamp)
-                    .await
-            })
-    }
-
     /// Get all the changed [FileSlice]s in splits from the table between the 
given timestamps.
     ///
     /// # Arguments
-    ///     * `num_splits` - The number of chunks to split the file slices 
into.
-    ///     * `start_timestamp` - If provided, only file slices that were 
changed after this timestamp will be returned.
-    ///     * `end_timestamp` - If provided, only file slices that were 
changed before or at this timestamp will be returned.
+    /// * `num_splits` - The number of chunks to split the file slices into.
+    /// * `start_timestamp` - If provided, only file slices that were changed 
after this timestamp will be returned.
+    /// * `end_timestamp` - If provided, only file slices that were changed 
before or at this timestamp will be returned.
     ///
     /// # Notes
-    ///     * This API is useful for implementing incremental query with read 
parallelism.
-    ///     * Uses the same splitting flow as the time-travel API to respect 
read parallelism config.
+    /// * This API is useful for implementing incremental query with read 
parallelism.
+    /// * Uses the same splitting flow as the time-travel API to respect read 
parallelism config.
     pub async fn get_file_slices_splits_between(
         &self,
         num_splits: usize,
@@ -589,22 +491,6 @@ impl Table {
             .await
     }
 
-    /// Same as [Table::get_file_slices_splits_between], but blocking.
-    pub fn get_file_slices_splits_between_blocking(
-        &self,
-        num_splits: usize,
-        start_timestamp: Option<&str>,
-        end_timestamp: Option<&str>,
-    ) -> Result<Vec<Vec<FileSlice>>> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async {
-                self.get_file_slices_splits_between(num_splits, 
start_timestamp, end_timestamp)
-                    .await
-            })
-    }
-
     async fn get_file_slices_splits_between_internal(
         &self,
         num_splits: usize,
@@ -662,7 +548,7 @@ impl Table {
     /// Get all the latest records in the table.
     ///
     /// # Arguments
-    ///     * `filters` - Partition filters to apply.
+    /// * `filters` - Partition filters to apply.
     pub async fn read_snapshot<I, S>(&self, filters: I) -> 
Result<Vec<RecordBatch>>
     where
         I: IntoIterator<Item = (S, S, S)>,
@@ -676,23 +562,11 @@ impl Table {
         }
     }
 
-    /// Same as [Table::read_snapshot], but blocking.
-    pub fn read_snapshot_blocking<I, S>(&self, filters: I) -> 
Result<Vec<RecordBatch>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.read_snapshot(filters).await })
-    }
-
     /// Get all the records in the table at a given timestamp.
     ///
     /// # Arguments
-    ///     * `timestamp` - The timestamp which records associated with.
-    ///     * `filters` - Partition filters to apply.
+    /// * `timestamp` - The timestamp which records associated with.
+    /// * `filters` - Partition filters to apply.
     pub async fn read_snapshot_as_of<I, S>(
         &self,
         timestamp: &str,
@@ -707,22 +581,6 @@ impl Table {
         self.read_snapshot_internal(&timestamp, &filters).await
     }
 
-    /// Same as [Table::read_snapshot_as_of], but blocking.
-    pub fn read_snapshot_as_of_blocking<I, S>(
-        &self,
-        timestamp: &str,
-        filters: I,
-    ) -> Result<Vec<RecordBatch>>
-    where
-        I: IntoIterator<Item = (S, S, S)>,
-        S: AsRef<str>,
-    {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async { self.read_snapshot_as_of(timestamp, 
filters).await })
-    }
-
     async fn read_snapshot_internal(
         &self,
         timestamp: &str,
@@ -745,8 +603,8 @@ impl Table {
     /// the time span being returned.
     ///
     /// # Arguments
-    ///     * `start_timestamp` - Only records that were inserted or updated 
after this timestamp will be returned.
-    ///     * `end_timestamp` - If provided, only records that were inserted 
or updated before or at this timestamp will be returned.
+    /// * `start_timestamp` - Only records that were inserted or updated after 
this timestamp will be returned.
+    /// * `end_timestamp` - If provided, only records that were inserted or 
updated before or at this timestamp will be returned.
     pub async fn read_incremental_records(
         &self,
         start_timestamp: &str,
@@ -780,21 +638,6 @@ impl Table {
         Ok(batches)
     }
 
-    /// Same as [Table::read_incremental_records], but blocking.
-    pub fn read_incremental_records_blocking(
-        &self,
-        start_timestamp: &str,
-        end_timestamp: Option<&str>,
-    ) -> Result<Vec<RecordBatch>> {
-        tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()?
-            .block_on(async {
-                self.read_incremental_records(start_timestamp, end_timestamp)
-                    .await
-            })
-    }
-
     /// Get the change-data-capture (CDC) records between the given timestamps.
     ///
     /// The CDC records should reflect the records that were inserted, 
updated, and deleted
@@ -987,26 +830,27 @@ mod tests {
     /// # Arguments
     ///
     /// * `table_dir_name` - Name of the table root directory; all under 
`crates/core/tests/data/`.
-    fn get_test_table_without_validation(table_dir_name: &str) -> Table {
+    async fn get_test_table_without_validation(table_dir_name: &str) -> Table {
         let base_url = Url::from_file_path(
             
canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(),
         )
         .unwrap();
-        Table::new_with_options_blocking(
+        Table::new_with_options(
             base_url.as_str(),
             [("hoodie.internal.skip.config.validation", "true")],
         )
+        .await
         .unwrap()
     }
 
     /// Test helper to get relative file paths from the table with filters.
-    fn get_file_paths_with_filters(
+    async fn get_file_paths_with_filters(
         table: &Table,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<String>> {
         let mut file_paths = Vec::new();
         let base_url = table.base_url();
-        for f in table.get_file_slices_blocking(filters.to_vec())? {
+        for f in table.get_file_slices(filters.to_vec()).await? {
             let relative_path = f.base_file_relative_path()?;
             let file_url = join_url_segments(&base_url, 
&[relative_path.as_str()])?;
             file_paths.push(file_url.to_string());
@@ -1014,10 +858,10 @@ mod tests {
         Ok(file_paths)
     }
 
-    #[test]
-    fn test_hudi_table_get_hudi_options() {
+    #[tokio::test]
+    async fn test_hudi_table_get_hudi_options() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let hudi_options = hudi_table.hudi_options();
         for (k, v) in hudi_options.iter() {
             assert!(k.starts_with("hoodie."));
@@ -1025,10 +869,10 @@ mod tests {
         }
     }
 
-    #[test]
-    fn test_hudi_table_get_storage_options() {
+    #[tokio::test]
+    async fn test_hudi_table_get_storage_options() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
 
         let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES
             .iter()
@@ -1050,27 +894,33 @@ mod tests {
         }
     }
 
-    #[test]
+    #[tokio::test]
     #[serial(env_vars)]
-    fn hudi_table_get_schema_from_empty_table_without_create_schema() {
-        let table = 
get_test_table_without_validation("table_props_no_create_schema");
+    async fn hudi_table_get_schema_from_empty_table_without_create_schema() {
+        let table = 
get_test_table_without_validation("table_props_no_create_schema").await;
 
-        let schema = table.get_schema_blocking();
+        let schema = table.get_schema().await;
         assert!(schema.is_err());
         assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
 
-        let schema = table.get_avro_schema_blocking();
+        let schema = table.get_schema_in_avro_str().await;
         assert!(schema.is_err());
         assert!(matches!(schema.unwrap_err(), CoreError::SchemaNotFound(_)));
     }
 
-    #[test]
-    fn 
hudi_table_get_schema_from_empty_table_resolves_to_table_create_schema() {
+    #[tokio::test]
+    async fn 
hudi_table_get_schema_from_empty_table_resolves_to_table_create_schema() {
         for base_url in SampleTable::V6Empty.urls() {
-            let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+            let hudi_table = Table::new(base_url.path()).await.unwrap();
+
+            // Validate the Arrow schema without meta fields
+            let schema = hudi_table.get_schema().await;
+            assert!(schema.is_ok());
+            let schema = schema.unwrap();
+            assert_arrow_field_names_eq!(schema, ["id", "name", "isActive"]);
 
-            // Validate the Arrow schema
-            let schema = hudi_table.get_schema_blocking();
+            // Validate the Arrow schema with meta fields
+            let schema = hudi_table.get_schema_with_meta_fields().await;
             assert!(schema.is_ok());
             let schema = schema.unwrap();
             assert_arrow_field_names_eq!(
@@ -1078,18 +928,31 @@ mod tests {
                 [MetaField::field_names(), vec!["id", "name", 
"isActive"]].concat()
             );
 
-            // Validate the Avro schema
-            let avro_schema = hudi_table.get_avro_schema_blocking();
+            // Validate the Avro schema without meta fields
+            let avro_schema = hudi_table.get_schema_in_avro_str().await;
             assert!(avro_schema.is_ok());
             let avro_schema = avro_schema.unwrap();
-            assert_avro_field_names_eq!(&avro_schema, ["id", "name", 
"isActive"])
+            assert_avro_field_names_eq!(&avro_schema, ["id", "name", 
"isActive"]);
+
+            // Validate the Avro schema with meta fields
+            let avro_schema = 
hudi_table.get_schema_in_avro_str_with_meta_fields().await;
+            assert!(avro_schema.is_ok());
+            let avro_schema = avro_schema.unwrap();
+            assert_avro_field_names_eq!(
+                &avro_schema,
+                [
+                    MetaField::field_names().as_slice(),
+                    &["id", "name", "isActive"]
+                ]
+                .concat()
+            );
         }
     }
 
-    #[test]
-    fn hudi_table_get_schema() {
+    #[tokio::test]
+    async fn hudi_table_get_schema() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let original_field_names = [
             "id",
             "name",
@@ -1109,8 +972,14 @@ mod tests {
             "structField",
         ];
 
-        // Check Arrow schema
-        let arrow_schema = hudi_table.get_schema_blocking();
+        // Check Arrow schema without meta fields
+        let arrow_schema = hudi_table.get_schema().await;
+        assert!(arrow_schema.is_ok());
+        let arrow_schema = arrow_schema.unwrap();
+        assert_arrow_field_names_eq!(arrow_schema, original_field_names);
+
+        // Check Arrow schema with meta fields
+        let arrow_schema = hudi_table.get_schema_with_meta_fields().await;
         assert!(arrow_schema.is_ok());
         let arrow_schema = arrow_schema.unwrap();
         assert_arrow_field_names_eq!(
@@ -1118,27 +987,27 @@ mod tests {
             [MetaField::field_names(), original_field_names.to_vec()].concat()
         );
 
-        // Check Avro schema
-        let avro_schema = hudi_table.get_avro_schema_blocking();
+        // Check Avro schema without meta fields
+        let avro_schema = hudi_table.get_schema_in_avro_str().await;
         assert!(avro_schema.is_ok());
         let avro_schema = avro_schema.unwrap();
         assert_avro_field_names_eq!(&avro_schema, original_field_names);
     }
 
-    #[test]
-    fn hudi_table_get_partition_schema() {
+    #[tokio::test]
+    async fn hudi_table_get_partition_schema() {
         let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
-        let schema = hudi_table.get_partition_schema_blocking();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let schema = hudi_table.get_partition_schema().await;
         assert!(schema.is_ok());
         let schema = schema.unwrap();
         assert_arrow_field_names_eq!(schema, ["ts_str"]);
     }
 
-    #[test]
+    #[tokio::test]
     #[serial(env_vars)]
-    fn validate_invalid_table_props() {
-        let table = get_test_table_without_validation("table_props_invalid");
+    async fn validate_invalid_table_props() {
+        let table = 
get_test_table_without_validation("table_props_invalid").await;
         let configs = table.hudi_configs;
         assert!(
             configs.validate(BaseFileFormat).is_err(),
@@ -1188,10 +1057,10 @@ mod tests {
         );
     }
 
-    #[test]
+    #[tokio::test]
     #[serial(env_vars)]
-    fn get_invalid_table_props() {
-        let table = get_test_table_without_validation("table_props_invalid");
+    async fn get_invalid_table_props() {
+        let table = 
get_test_table_without_validation("table_props_invalid").await;
         let configs = table.hudi_configs;
         assert!(configs.get(BaseFileFormat).is_err());
         assert!(configs.get(Checksum).is_err());
@@ -1211,10 +1080,10 @@ mod tests {
         assert!(configs.get(TimelineTimezone).is_err());
     }
 
-    #[test]
+    #[tokio::test]
     #[serial(env_vars)]
-    fn get_default_for_invalid_table_props() {
-        let table = get_test_table_without_validation("table_props_invalid");
+    async fn get_default_for_invalid_table_props() {
+        let table = 
get_test_table_without_validation("table_props_invalid").await;
         let configs = table.hudi_configs;
         let actual: String = configs.get_or_default(BaseFileFormat).into();
         assert_eq!(actual, "parquet");
@@ -1243,10 +1112,10 @@ mod tests {
         assert_eq!(actual, "utc");
     }
 
-    #[test]
+    #[tokio::test]
     #[serial(env_vars)]
-    fn get_valid_table_props() {
-        let table = get_test_table_without_validation("table_props_valid");
+    async fn get_valid_table_props() {
+        let table = 
get_test_table_without_validation("table_props_valid").await;
         let configs = table.hudi_configs;
         let actual: String = configs.get(BaseFileFormat).unwrap().into();
         assert_eq!(actual, "parquet");
@@ -1282,11 +1151,11 @@ mod tests {
         assert_eq!(actual, "local");
     }
 
-    #[test]
+    #[tokio::test]
     #[serial(env_vars)]
-    fn get_global_table_props() {
+    async fn get_global_table_props() {
         // Without the environment variable HUDI_CONF_DIR
-        let table = get_test_table_without_validation("table_props_partial");
+        let table = 
get_test_table_without_validation("table_props_partial").await;
         let configs = table.hudi_configs;
         assert!(configs.get(DatabaseName).is_err());
         assert!(configs.get(TableType).is_err());
@@ -1299,7 +1168,7 @@ mod tests {
         unsafe {
             env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
         }
-        let table = get_test_table_without_validation("table_props_partial");
+        let table = 
get_test_table_without_validation("table_props_partial").await;
         let configs = table.hudi_configs;
         assert!(configs.get(DatabaseName).is_err());
         assert!(configs.get(TableType).is_err());
@@ -1312,7 +1181,7 @@ mod tests {
         unsafe {
             env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
         }
-        let table = get_test_table_without_validation("table_props_partial");
+        let table = 
get_test_table_without_validation("table_props_partial").await;
         let configs = table.hudi_configs;
         let actual: String = configs.get(DatabaseName).unwrap().into();
         assert_eq!(actual, "tmpdb");
@@ -1325,54 +1194,58 @@ mod tests {
         }
     }
 
-    #[test]
-    fn hudi_table_read_file_slice() {
+    #[tokio::test]
+    async fn hudi_table_read_file_slice() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let batches = hudi_table
             .create_file_group_reader_with_options(empty_options())
             .unwrap()
-            .read_file_slice_by_base_file_path_blocking(
+            .read_file_slice_by_base_file_path(
                 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
             )
+            .await
             .unwrap();
         assert_eq!(batches.num_rows(), 4);
         assert_eq!(batches.num_columns(), 21);
     }
 
-    #[test]
-    fn empty_hudi_table_get_file_slices_splits() {
+    #[tokio::test]
+    async fn empty_hudi_table_get_file_slices_splits() {
         let base_url = SampleTable::V6Empty.url_to_cow();
 
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_blocking(2, empty_filters())
+            .get_file_slices_splits(2, empty_filters())
+            .await
             .unwrap();
         assert!(file_slices_splits.is_empty());
     }
 
-    #[test]
-    fn hudi_table_get_file_slices_splits() {
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_splits() {
         let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
 
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_blocking(2, empty_filters())
+            .get_file_slices_splits(2, empty_filters())
+            .await
             .unwrap();
         assert_eq!(file_slices_splits.len(), 2);
         assert_eq!(file_slices_splits[0].len(), 2);
         assert_eq!(file_slices_splits[1].len(), 1);
     }
 
-    #[test]
-    fn hudi_table_get_file_slices_splits_as_of_timestamps() {
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_splits_as_of_timestamps() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
 
         // before replacecommit (insert overwrite table)
         let second_latest_timestamp = "20250121000656060";
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_as_of_blocking(2, second_latest_timestamp, 
empty_filters())
+            .get_file_slices_splits_as_of(2, second_latest_timestamp, 
empty_filters())
+            .await
             .unwrap();
         assert_eq!(file_slices_splits.len(), 2);
         assert_eq!(file_slices_splits[0].len(), 2);
@@ -1405,20 +1278,19 @@ mod tests {
         // as of replacecommit (insert overwrite table)
         let latest_timestamp = "20250121000702475";
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_as_of_blocking(2, latest_timestamp, 
empty_filters())
+            .get_file_slices_splits_as_of(2, latest_timestamp, empty_filters())
+            .await
             .unwrap();
         assert_eq!(file_slices_splits.len(), 1);
         assert_eq!(file_slices_splits[0].len(), 1);
     }
 
-    #[test]
-    fn hudi_table_get_file_slices_as_of_timestamps() {
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_as_of_timestamps() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
 
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
-        let file_slices = hudi_table
-            .get_file_slices_blocking(empty_filters())
-            .unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let file_slices = 
hudi_table.get_file_slices(empty_filters()).await.unwrap();
         assert_eq!(
             file_slices
                 .iter()
@@ -1428,9 +1300,10 @@ mod tests {
         );
 
         // as of the latest timestamp
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices = hudi_table
-            .get_file_slices_as_of_blocking("20240418173551906", 
empty_filters())
+            .get_file_slices_as_of("20240418173551906", empty_filters())
+            .await
             .unwrap();
         assert_eq!(
             file_slices
@@ -1441,10 +1314,12 @@ mod tests {
         );
 
         // as of just smaller than the latest timestamp
-        let hudi_table =
-            Table::new_with_options_blocking(base_url.path(), 
empty_options()).unwrap();
+        let hudi_table = Table::new_with_options(base_url.path(), 
empty_options())
+            .await
+            .unwrap();
         let file_slices = hudi_table
-            .get_file_slices_as_of_blocking("20240418173551905", 
empty_filters())
+            .get_file_slices_as_of("20240418173551905", empty_filters())
+            .await
             .unwrap();
         assert_eq!(
             file_slices
@@ -1455,10 +1330,12 @@ mod tests {
         );
 
         // as of non-exist old timestamp
-        let hudi_table =
-            Table::new_with_options_blocking(base_url.path(), 
empty_options()).unwrap();
+        let hudi_table = Table::new_with_options(base_url.path(), 
empty_options())
+            .await
+            .unwrap();
         let file_slices = hudi_table
-            .get_file_slices_as_of_blocking("19700101000000", empty_filters())
+            .get_file_slices_as_of("19700101000000", empty_filters())
+            .await
             .unwrap();
         assert_eq!(
             file_slices
@@ -1469,22 +1346,24 @@ mod tests {
         );
     }
 
-    #[test]
-    fn empty_hudi_table_get_file_slices_between_timestamps() {
+    #[tokio::test]
+    async fn empty_hudi_table_get_file_slices_between_timestamps() {
         let base_url = SampleTable::V6Empty.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices = hudi_table
-            .get_file_slices_between_blocking(Some(EARLIEST_START_TIMESTAMP), 
None)
+            .get_file_slices_between(Some(EARLIEST_START_TIMESTAMP), None)
+            .await
             .unwrap();
         assert!(file_slices.is_empty())
     }
 
-    #[test]
-    fn hudi_table_get_file_slices_between_timestamps() {
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_between_timestamps() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let mut file_slices = hudi_table
-            .get_file_slices_between_blocking(None, Some("20250121000656060"))
+            .get_file_slices_between(None, Some("20250121000656060"))
+            .await
             .unwrap();
         assert_eq!(file_slices.len(), 3);
 
@@ -1515,22 +1394,24 @@ mod tests {
         assert!(file_slice_2.log_files.is_empty());
     }
 
-    #[test]
-    fn empty_hudi_table_get_file_slices_splits_between() {
+    #[tokio::test]
+    async fn empty_hudi_table_get_file_slices_splits_between() {
         let base_url = SampleTable::V6Empty.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_between_blocking(2, 
Some(EARLIEST_START_TIMESTAMP), None)
+            .get_file_slices_splits_between(2, Some(EARLIEST_START_TIMESTAMP), 
None)
+            .await
             .unwrap();
         assert!(file_slices_splits.is_empty())
     }
 
-    #[test]
-    fn hudi_table_get_file_slices_splits_between() {
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_splits_between() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_between_blocking(2, None, 
Some("20250121000656060"))
+            .get_file_slices_splits_between(2, None, Some("20250121000656060"))
+            .await
             .unwrap();
 
         assert_eq!(file_slices_splits.len(), 2);
@@ -1540,12 +1421,13 @@ mod tests {
         assert_eq!(file_slices_splits[1].len(), 1);
     }
 
-    #[test]
-    fn hudi_table_get_file_slices_splits_between_with_single_split() {
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_splits_between_with_single_split() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_between_blocking(1, None, 
Some("20250121000656060"))
+            .get_file_slices_splits_between(1, None, Some("20250121000656060"))
+            .await
             .unwrap();
 
         // Should have 1 split with all 3 file slices
@@ -1553,12 +1435,13 @@ mod tests {
         assert_eq!(file_slices_splits[0].len(), 3);
     }
 
-    #[test]
-    fn hudi_table_get_file_slices_splits_between_with_many_splits() {
+    #[tokio::test]
+    async fn hudi_table_get_file_slices_splits_between_with_many_splits() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_between_blocking(10, None, 
Some("20250121000656060"))
+            .get_file_slices_splits_between(10, None, 
Some("20250121000656060"))
+            .await
             .unwrap();
 
         assert_eq!(file_slices_splits.len(), 3);
@@ -1567,14 +1450,15 @@ mod tests {
         }
     }
 
-    #[test]
-    fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
+    #[tokio::test]
+    async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
         let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
 
         let partition_filters = &[];
         let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
+            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1590,6 +1474,7 @@ mod tests {
 
         let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")];
         let actual = get_file_paths_with_filters(&hudi_table, &filters)
+            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1603,6 +1488,7 @@ mod tests {
         assert_eq!(actual, expected);
 
         let actual = get_file_paths_with_filters(&hudi_table, &[("byteField", 
">", "30")])
+            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1610,14 +1496,15 @@ mod tests {
         assert_eq!(actual, expected);
     }
 
-    #[test]
-    fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
+    #[tokio::test]
+    async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
         let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
-        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let hudi_table = Table::new(base_url.path()).await.unwrap();
         assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
 
         let partition_filters = &[];
         let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
+            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1637,6 +1524,7 @@ mod tests {
             ("shortField", "!=", "100"),
         ];
         let actual = get_file_paths_with_filters(&hudi_table, &filters)
+            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1650,6 +1538,7 @@ mod tests {
 
         let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")];
         let actual = get_file_paths_with_filters(&hudi_table, &filters)
+            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
diff --git a/crates/core/src/timeline/mod.rs b/crates/core/src/timeline/mod.rs
index 9b7b846..76ced9b 100644
--- a/crates/core/src/timeline/mod.rs
+++ b/crates/core/src/timeline/mod.rs
@@ -31,7 +31,7 @@ use crate::error::CoreError;
 use crate::file_group::FileGroup;
 use crate::file_group::builder::replaced_file_groups_from_replace_commit;
 use crate::schema::resolver::{
-    resolve_avro_schema_from_commit_metadata, 
resolve_schema_from_commit_metadata,
+    resolve_avro_schema_from_commit_metadata, 
resolve_data_schema_from_commit_metadata,
 };
 use crate::storage::Storage;
 use crate::timeline::builder::TimelineBuilder;
@@ -270,14 +270,14 @@ impl Timeline {
         resolve_avro_schema_from_commit_metadata(&commit_metadata)
     }
 
-    /// Get the latest [arrow_schema::Schema] from the [Timeline].
+    /// Get the latest data [arrow_schema::Schema] from the [Timeline], 
without Hudi meta fields.
     ///
     /// ### Note
     /// This API behaves differently from [crate::table::Table::get_schema],
     /// which additionally looks for [HudiTableConfig::CreateSchema] in the 
table config.
     pub async fn get_latest_schema(&self) -> Result<Schema> {
         let commit_metadata = self.get_latest_commit_metadata().await?;
-        resolve_schema_from_commit_metadata(&commit_metadata, 
self.storage.clone()).await
+        resolve_data_schema_from_commit_metadata(&commit_metadata, 
self.storage.clone()).await
     }
 
     pub(crate) async fn get_replaced_file_groups_as_of(
@@ -485,7 +485,8 @@ mod tests {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
         let timeline = create_test_timeline(base_url).await;
         let table_schema = timeline.get_latest_schema().await.unwrap();
-        assert_eq!(table_schema.fields.len(), 21)
+        // get_latest_schema returns data schema without meta fields
+        assert_eq!(table_schema.fields.len(), 16)
     }
 
     #[tokio::test]
@@ -594,17 +595,13 @@ mod tests {
         .unwrap();
         let timeline = create_test_timeline(base_url).await;
 
-        // Check Arrow schema
+        // Check Arrow schema — get_latest_schema returns data schema without 
meta fields
         let arrow_schema = timeline.get_latest_schema().await;
         assert!(arrow_schema.is_ok());
         let arrow_schema = arrow_schema.unwrap();
         assert_arrow_field_names_eq!(
             arrow_schema,
-            [
-                MetaField::field_names(),
-                vec!["ts", "uuid", "rider", "driver", "fare", "city"]
-            ]
-            .concat()
+            vec!["ts", "uuid", "rider", "driver", "fare", "city"]
         );
 
         // Check Avro schema
diff --git a/crates/core/tests/table_read_tests.rs 
b/crates/core/tests/table_read_tests.rs
index 89596ee..e3d424f 100644
--- a/crates/core/tests/table_read_tests.rs
+++ b/crates/core/tests/table_read_tests.rs
@@ -35,21 +35,21 @@ mod v6_tables {
     mod snapshot_queries {
         use super::*;
 
-        #[test]
-        fn test_empty_table() -> Result<()> {
+        #[tokio::test]
+        async fn test_empty_table() -> Result<()> {
             for base_url in SampleTable::V6Empty.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
-                let records = 
hudi_table.read_snapshot_blocking(empty_filters())?;
+                let hudi_table = Table::new(base_url.path()).await?;
+                let records = hudi_table.read_snapshot(empty_filters()).await?;
                 assert!(records.is_empty());
             }
             Ok(())
         }
 
-        #[test]
-        fn test_non_partitioned() -> Result<()> {
+        #[tokio::test]
+        async fn test_non_partitioned() -> Result<()> {
             for base_url in SampleTable::V6Nonpartitioned.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
-                let records = 
hudi_table.read_snapshot_blocking(empty_filters())?;
+                let hudi_table = Table::new(base_url.path()).await?;
+                let records = hudi_table.read_snapshot(empty_filters()).await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -67,13 +67,14 @@ mod v6_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_non_partitioned_read_optimized() -> Result<()> {
+        #[tokio::test]
+        async fn test_non_partitioned_read_optimized() -> Result<()> {
             let base_url = SampleTable::V6Nonpartitioned.url_to_mor_parquet();
-            let hudi_table = Table::new_with_options_blocking(
+            let hudi_table = Table::new_with_options(
                 base_url.path(),
                 [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
-            )?;
+            )
+            .await?;
             let commit_timestamps = hudi_table
                 .timeline
                 .completed_commits
@@ -81,8 +82,9 @@ mod v6_tables {
                 .map(|i| i.timestamp.as_str())
                 .collect::<Vec<_>>();
             let latest_commit = commit_timestamps.last().unwrap();
-            let records =
-                hudi_table.read_snapshot_as_of_blocking(latest_commit, 
empty_filters())?;
+            let records = hudi_table
+                .read_snapshot_as_of(latest_commit, empty_filters())
+                .await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -99,11 +101,11 @@ mod v6_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_non_partitioned_rollback() -> Result<()> {
+        #[tokio::test]
+        async fn test_non_partitioned_rollback() -> Result<()> {
             let base_url = 
SampleTable::V6NonpartitionedRollback.url_to_mor_parquet();
-            let hudi_table = Table::new_blocking(base_url.path())?;
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let hudi_table = Table::new(base_url.path()).await?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -119,17 +121,17 @@ mod v6_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
+        #[tokio::test]
+        async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
             for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
+                let hudi_table = Table::new(base_url.path()).await?;
 
                 let filters = vec![
                     ("byteField", ">=", "10"),
                     ("byteField", "<", "20"),
                     ("shortField", "!=", "100"),
                 ];
-                let records = hudi_table.read_snapshot_blocking(filters)?;
+                let records = hudi_table.read_snapshot(filters).await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -139,11 +141,11 @@ mod v6_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
+        #[tokio::test]
+        async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
             for base_url in 
SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
-                let records = 
hudi_table.read_snapshot_blocking(empty_filters())?;
+                let hudi_table = Table::new(base_url.path()).await?;
+                let records = hudi_table.read_snapshot(empty_filters()).await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -165,10 +167,10 @@ mod v6_tables {
     mod time_travel_queries {
         use super::*;
 
-        #[test]
-        fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
+        #[tokio::test]
+        async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
             for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
+                let hudi_table = Table::new(base_url.path()).await?;
                 let commit_timestamps = hudi_table
                     .timeline
                     .completed_commits
@@ -176,8 +178,9 @@ mod v6_tables {
                     .map(|i| i.timestamp.as_str())
                     .collect::<Vec<_>>();
                 let first_commit = commit_timestamps[0];
-                let records =
-                    hudi_table.read_snapshot_as_of_blocking(first_commit, 
empty_filters())?;
+                let records = hudi_table
+                    .read_snapshot_as_of(first_commit, empty_filters())
+                    .await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -194,15 +197,15 @@ mod v6_tables {
     mod mor_log_file_queries {
         use super::*;
 
-        #[test]
-        fn test_quickstart_trips_inserts_updates() -> Result<()> {
+        #[tokio::test]
+        async fn test_quickstart_trips_inserts_updates() -> Result<()> {
             let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
             let updated_rider = "rider-D";
 
             // verify updated record as of the latest commit
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -224,7 +227,9 @@ mod v6_tables {
                 .map(|i| i.timestamp.as_str())
                 .collect::<Vec<_>>();
             let first_commit = commit_timestamps[0];
-            let records = 
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
+            let records = hudi_table
+                .read_snapshot_as_of(first_commit, empty_filters())
+                .await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -241,15 +246,15 @@ mod v6_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_quickstart_trips_inserts_deletes() -> Result<()> {
+        #[tokio::test]
+        async fn test_quickstart_trips_inserts_deletes() -> Result<()> {
             let base_url = QuickstartTripsTable::V6Trips8I3D.url_to_mor_avro();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
             let deleted_riders = ["rider-A", "rider-C", "rider-D"];
 
             // verify deleted record as of the latest commit
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let riders = QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -270,7 +275,9 @@ mod v6_tables {
                 .map(|i| i.timestamp.as_str())
                 .collect::<Vec<_>>();
             let first_commit = commit_timestamps[0];
-            let records = 
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
+            let records = hudi_table
+                .read_snapshot_as_of(first_commit, empty_filters())
+                .await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let mut uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -293,20 +300,20 @@ mod v6_tables {
     mod incremental_queries {
         use super::*;
 
-        #[test]
-        fn test_empty_table() -> Result<()> {
+        #[tokio::test]
+        async fn test_empty_table() -> Result<()> {
             for base_url in SampleTable::V6Empty.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
-                let records = 
hudi_table.read_incremental_records_blocking("0", None)?;
+                let hudi_table = Table::new(base_url.path()).await?;
+                let records = hudi_table.read_incremental_records("0", 
None).await?;
                 assert!(records.is_empty())
             }
             Ok(())
         }
 
-        #[test]
-        fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
+        #[tokio::test]
+        async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> 
{
             for base_url in 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
-                let hudi_table = Table::new_blocking(base_url.path())?;
+                let hudi_table = Table::new(base_url.path()).await?;
                 let commit_timestamps = hudi_table
                     .timeline
                     .completed_commits
@@ -320,7 +327,8 @@ mod v6_tables {
 
                 // read records changed from the beginning to the 1st commit
                 let records = hudi_table
-                    .read_incremental_records_blocking("19700101000000", 
Some(first_commit))?;
+                    .read_incremental_records("19700101000000", 
Some(first_commit))
+                    .await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -332,7 +340,8 @@ mod v6_tables {
 
                 // read records changed from the 1st to the 2nd commit
                 let records = hudi_table
-                    .read_incremental_records_blocking(first_commit, 
Some(second_commit))?;
+                    .read_incremental_records(first_commit, 
Some(second_commit))
+                    .await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -344,7 +353,8 @@ mod v6_tables {
 
                 // read records changed from the 2nd to the 3rd commit
                 let records = hudi_table
-                    .read_incremental_records_blocking(second_commit, 
Some(third_commit))?;
+                    .read_incremental_records(second_commit, 
Some(third_commit))
+                    .await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -355,7 +365,9 @@ mod v6_tables {
                 );
 
                 // read records changed from the 1st commit
-                let records = 
hudi_table.read_incremental_records_blocking(first_commit, None)?;
+                let records = hudi_table
+                    .read_incremental_records(first_commit, None)
+                    .await?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -366,7 +378,9 @@ mod v6_tables {
                 );
 
                 // read records changed from the 3rd commit
-                let records = 
hudi_table.read_incremental_records_blocking(third_commit, None)?;
+                let records = hudi_table
+                    .read_incremental_records(third_commit, None)
+                    .await?;
                 assert!(
                     records.is_empty(),
                     "Should return 0 record as it's the latest commit"
@@ -384,20 +398,20 @@ mod v8_tables {
     mod snapshot_queries {
         use super::*;
 
-        #[test]
-        fn test_empty_table() -> Result<()> {
+        #[tokio::test]
+        async fn test_empty_table() -> Result<()> {
             let base_url = SampleTable::V8Empty.url_to_cow();
-            let hudi_table = Table::new_blocking(base_url.path())?;
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let hudi_table = Table::new(base_url.path()).await?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             assert!(records.is_empty());
             Ok(())
         }
 
-        #[test]
-        fn test_non_partitioned() -> Result<()> {
+        #[tokio::test]
+        async fn test_non_partitioned() -> Result<()> {
             let base_url = SampleTable::V8Nonpartitioned.url_to_cow();
-            let hudi_table = Table::new_blocking(base_url.path())?;
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let hudi_table = Table::new(base_url.path()).await?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -414,12 +428,12 @@ mod v8_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_complex_keygen_hive_style() -> Result<()> {
+        #[tokio::test]
+        async fn test_complex_keygen_hive_style() -> Result<()> {
             let base_url = SampleTable::V8ComplexkeygenHivestyle.url_to_cow();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -436,12 +450,12 @@ mod v8_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_simple_keygen_nonhivestyle() -> Result<()> {
+        #[tokio::test]
+        async fn test_simple_keygen_nonhivestyle() -> Result<()> {
             let base_url = 
SampleTable::V8SimplekeygenNonhivestyle.url_to_cow();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -458,12 +472,12 @@ mod v8_tables {
             Ok(())
         }
 
-        #[test]
-        fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
+        #[tokio::test]
+        async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
             let base_url = 
SampleTable::V8SimplekeygenHivestyleNoMetafields.url_to_cow();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -485,16 +499,16 @@ mod v8_tables {
     mod mor_log_file_queries {
         use super::*;
 
-        #[test]
-        fn test_quickstart_trips_inserts_updates_deletes() -> Result<()> {
+        #[tokio::test]
+        async fn test_quickstart_trips_inserts_updates_deletes() -> Result<()> 
{
             // V8Trips8I3U1D: 8 inserts, 3 updates (A, J, G fare=0), 2 deletes 
(F, J)
             let base_url = 
QuickstartTripsTable::V8Trips8I3U1D.url_to_mor_avro();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
             let deleted_riders = ["rider-F", "rider-J"];
 
             // verify deleted records are not present in latest snapshot
-            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
+            let records = hudi_table.read_snapshot(empty_filters()).await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records);
@@ -534,7 +548,9 @@ mod v8_tables {
                 .map(|i| i.timestamp.as_str())
                 .collect::<Vec<_>>();
             let first_commit = commit_timestamps[0];
-            let records = 
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
+            let records = hudi_table
+                .read_snapshot_as_of(first_commit, empty_filters())
+                .await?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let mut uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -1154,10 +1170,10 @@ mod mdt_enabled_tables {
         /// 1. Table can be read correctly via MDT file listing
         /// 2. MDT partition key normalization ("." -> "") works correctly
         /// 3. File slices are retrieved correctly from MDT
-        #[test]
-        fn test_v9_nonpartitioned_with_mdt() -> Result<()> {
+        #[tokio::test]
+        async fn test_v9_nonpartitioned_with_mdt() -> Result<()> {
             let base_url = SampleTable::V9TxnsNonpartMeta.url_to_mor_avro();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
             // Verify MDT is enabled
             assert!(
@@ -1166,7 +1182,7 @@ mod mdt_enabled_tables {
             );
 
             // Get file slices - this uses MDT file listing
-            let file_slices = 
hudi_table.get_file_slices_blocking(empty_filters())?;
+            let file_slices = 
hudi_table.get_file_slices(empty_filters()).await?;
 
             // Should have file slices for the non-partitioned table
             assert!(
@@ -1189,15 +1205,16 @@ mod mdt_enabled_tables {
         /// The metadata table stores "." as partition key, but external API 
should see "".
         /// For non-partitioned tables, we use a fast path that directly 
fetches "." without
         /// going through __all_partitions__ lookup.
-        #[test]
-        fn test_v9_nonpartitioned_mdt_partition_normalization() -> Result<()> {
+        #[tokio::test]
+        async fn test_v9_nonpartitioned_mdt_partition_normalization() -> 
Result<()> {
             let base_url = SampleTable::V9TxnsNonpartMeta.url_to_mor_avro();
-            let hudi_table = Table::new_blocking(base_url.path())?;
+            let hudi_table = Table::new(base_url.path()).await?;
 
             // Read MDT files partition records
             let partition_pruner = PartitionPruner::empty();
-            let records =
-                
hudi_table.read_metadata_table_files_partition_blocking(&partition_pruner)?;
+            let records = hudi_table
+                .read_metadata_table_files_partition(&partition_pruner)
+                .await?;
 
             // For non-partitioned tables, the fast path only fetches the 
files record.
             // __all_partitions__ is not fetched to avoid redundant HFile 
lookup.
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 0674cd3..b7e3200 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -23,7 +23,6 @@ use std::any::Any;
 use std::collections::HashMap;
 use std::fmt::Debug;
 use std::sync::Arc;
-use std::thread;
 
 use arrow_schema::{Schema, SchemaRef};
 use async_trait::async_trait;
@@ -84,6 +83,8 @@ use hudi_core::table::Table as HudiTable;
 #[derive(Clone)]
 pub struct HudiDataSource {
     table: Arc<HudiTable>,
+    /// Cached table schema (with meta fields) for synchronous access in 
`TableProvider::schema()`.
+    schema: SchemaRef,
     /// Cached partition schema for determining partition columns.
     /// This is cached at construction since partition schema rarely changes
     /// and is needed synchronously in `supports_filters_pushdown`.
@@ -122,6 +123,16 @@ impl HudiDataSource {
             .await
             .map_err(|e| Execution(format!("Failed to create Hudi table: 
{e}")))?;
 
+        // Cache schema with meta fields at construction for synchronous access
+        let schema = table
+            .get_schema_with_meta_fields()
+            .await
+            .map(SchemaRef::from)
+            .unwrap_or_else(|e| {
+                warn!("Failed to get table schema, using empty schema: {e}");
+                SchemaRef::from(Schema::empty())
+            });
+
         // Cache partition schema at construction for use in 
supports_filters_pushdown
         let partition_schema = match table.get_partition_schema().await {
             Ok(s) => s,
@@ -133,6 +144,7 @@ impl HudiDataSource {
 
         Ok(Self {
             table: Arc::new(table),
+            schema,
             partition_schema,
         })
     }
@@ -266,13 +278,7 @@ impl TableProvider for HudiDataSource {
     }
 
     fn schema(&self) -> SchemaRef {
-        let table = self.table.clone();
-        let handle = thread::spawn(move || {
-            let rt = tokio::runtime::Runtime::new().unwrap();
-            rt.block_on(async { table.get_schema().await })
-        });
-        let result = handle.join().unwrap().unwrap_or_else(|_| 
Schema::empty());
-        SchemaRef::from(result)
+        self.schema.clone()
     }
 
     fn table_type(&self) -> TableType {
diff --git a/python/hudi/_internal.pyi b/python/hudi/_internal.pyi
index 89c72b9..502d425 100644
--- a/python/hudi/_internal.pyi
+++ b/python/hudi/_internal.pyi
@@ -208,9 +208,17 @@ class HudiTable:
             str: The timezone of the table.
         """
         ...
-    def get_avro_schema(self) -> str:
+    def get_schema_in_avro_str(self) -> str:
         """
-        Returns the Avro schema of the Hudi table.
+        Returns the Avro schema of the Hudi table, without meta fields.
+
+        Returns:
+            str: The Avro schema of the table.
+        """
+        ...
+    def get_schema_in_avro_str_with_meta_fields(self) -> str:
+        """
+        Returns the Avro schema of the Hudi table, with meta fields prepended.
 
         Returns:
             str: The Avro schema of the table.
@@ -218,7 +226,15 @@ class HudiTable:
         ...
     def get_schema(self) -> "pyarrow.Schema":
         """
-        Returns the schema of the Hudi table.
+        Returns the schema of the Hudi table, without meta fields.
+
+        Returns:
+            pyarrow.Schema: The schema of the table.
+        """
+        ...
+    def get_schema_with_meta_fields(self) -> "pyarrow.Schema":
+        """
+        Returns the schema of the Hudi table, with meta fields prepended.
 
         Returns:
             pyarrow.Schema: The schema of the table.
diff --git a/python/src/internal.rs b/python/src/internal.rs
index e6224d3..ca5081c 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -83,11 +83,17 @@ impl HudiFileGroupReader {
     #[new]
     #[pyo3(signature = (base_uri, options=None))]
     fn new_with_options(
+        py: Python,
         base_uri: &str,
         options: Option<HashMap<String, String>>,
     ) -> PyResult<Self> {
-        let inner = FileGroupReader::new_with_options(base_uri, 
options.unwrap_or_default())
-            .map_err(PythonError::from)?;
+        let inner = py.detach(|| {
+            rt().block_on(FileGroupReader::new_with_options(
+                base_uri,
+                options.unwrap_or_default(),
+            ))
+            .map_err(PythonError::from)
+        })?;
         Ok(HudiFileGroupReader { inner })
     }
 
@@ -96,11 +102,14 @@ impl HudiFileGroupReader {
         relative_path: &str,
         py: Python,
     ) -> PyResult<Py<PyAny>> {
-        
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
-            .map_err(PythonError::from)?
-            .to_pyarrow(py)
-            .map(|b| b.unbind())
+        py.detach(|| {
+            
rt().block_on(self.inner.read_file_slice_by_base_file_path(relative_path))
+                .map_err(PythonError::from)
+        })?
+        .to_pyarrow(py)
+        .map(|b| b.unbind())
     }
+
     fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> 
PyResult<Py<PyAny>> {
         let mut file_group = FileGroup::new_with_base_file_name(
             &file_slice.base_file_name,
@@ -121,10 +130,12 @@ impl HudiFileGroupReader {
                 ))
             })
             .map_err(PythonError::from)?;
-        rt().block_on(self.inner.read_file_slice(file_slice))
-            .map_err(PythonError::from)?
-            .to_pyarrow(py)
-            .map(|b| b.unbind())
+        py.detach(|| {
+            rt().block_on(self.inner.read_file_slice(file_slice))
+                .map_err(PythonError::from)
+        })?
+        .to_pyarrow(py)
+        .map(|b| b.unbind())
     }
 
     fn read_file_slice_from_paths(
@@ -133,11 +144,13 @@ impl HudiFileGroupReader {
         log_file_paths: Vec<String>,
         py: Python,
     ) -> PyResult<Py<PyAny>> {
-        rt().block_on(
-            self.inner
-                .read_file_slice_from_paths(base_file_path, log_file_paths),
-        )
-        .map_err(PythonError::from)?
+        py.detach(|| {
+            rt().block_on(
+                self.inner
+                    .read_file_slice_from_paths(base_file_path, 
log_file_paths),
+            )
+            .map_err(PythonError::from)
+        })?
         .to_pyarrow(py)
         .map(|b| b.unbind())
     }
@@ -278,15 +291,17 @@ impl HudiTable {
     #[new]
     #[pyo3(signature = (base_uri, options=None))]
     fn new_with_options(
+        py: Python,
         base_uri: &str,
         options: Option<HashMap<String, String>>,
     ) -> PyResult<Self> {
-        let inner: Table = rt()
-            .block_on(Table::new_with_options(
+        let inner: Table = py.detach(|| {
+            rt().block_on(Table::new_with_options(
                 base_uri,
                 options.unwrap_or_default(),
             ))
-            .map_err(PythonError::from)?;
+            .map_err(PythonError::from)
+        })?;
         Ok(HudiTable { inner })
     }
 
@@ -318,27 +333,49 @@ impl HudiTable {
         self.inner.timezone()
     }
 
-    fn get_avro_schema(&self, py: Python) -> PyResult<String> {
+    fn get_schema_in_avro_str(&self, py: Python) -> PyResult<String> {
+        py.detach(|| {
+            let avro_schema = rt()
+                .block_on(self.inner.get_schema_in_avro_str())
+                .map_err(PythonError::from)?;
+            Ok(avro_schema)
+        })
+    }
+
+    fn get_schema_in_avro_str_with_meta_fields(&self, py: Python) -> 
PyResult<String> {
         py.detach(|| {
             let avro_schema = rt()
-                .block_on(self.inner.get_avro_schema())
+                .block_on(self.inner.get_schema_in_avro_str_with_meta_fields())
                 .map_err(PythonError::from)?;
             Ok(avro_schema)
         })
     }
 
     fn get_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
-        rt().block_on(self.inner.get_schema())
-            .map_err(PythonError::from)?
-            .to_pyarrow(py)
-            .map(|b| b.unbind())
+        py.detach(|| {
+            rt().block_on(self.inner.get_schema())
+                .map_err(PythonError::from)
+        })?
+        .to_pyarrow(py)
+        .map(|b| b.unbind())
+    }
+
+    fn get_schema_with_meta_fields(&self, py: Python) -> PyResult<Py<PyAny>> {
+        py.detach(|| {
+            rt().block_on(self.inner.get_schema_with_meta_fields())
+                .map_err(PythonError::from)
+        })?
+        .to_pyarrow(py)
+        .map(|b| b.unbind())
     }
 
     fn get_partition_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
-        rt().block_on(self.inner.get_partition_schema())
-            .map_err(PythonError::from)?
-            .to_pyarrow(py)
-            .map(|b| b.unbind())
+        py.detach(|| {
+            rt().block_on(self.inner.get_partition_schema())
+                .map_err(PythonError::from)
+        })?
+        .to_pyarrow(py)
+        .map(|b| b.unbind())
     }
 
     fn get_timeline(&self, py: Python) -> HudiTimeline {
@@ -460,10 +497,12 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Py<PyAny>> {
-        rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
-            .map_err(PythonError::from)?
-            .to_pyarrow(py)
-            .map(|b| b.unbind())
+        py.detach(|| {
+            
rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
+                .map_err(PythonError::from)
+        })?
+        .to_pyarrow(py)
+        .map(|b| b.unbind())
     }
 
     #[pyo3(signature = (timestamp, filters=None))]
@@ -473,11 +512,13 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Py<PyAny>> {
-        rt().block_on(
-            self.inner
-                .read_snapshot_as_of(timestamp, filters.unwrap_or_default()),
-        )
-        .map_err(PythonError::from)?
+        py.detach(|| {
+            rt().block_on(
+                self.inner
+                    .read_snapshot_as_of(timestamp, 
filters.unwrap_or_default()),
+            )
+            .map_err(PythonError::from)
+        })?
         .to_pyarrow(py)
         .map(|b| b.unbind())
     }
@@ -489,11 +530,13 @@ impl HudiTable {
         end_timestamp: Option<&str>,
         py: Python,
     ) -> PyResult<Py<PyAny>> {
-        rt().block_on(
-            self.inner
-                .read_incremental_records(start_timestamp, end_timestamp),
-        )
-        .map_err(PythonError::from)?
+        py.detach(|| {
+            rt().block_on(
+                self.inner
+                    .read_incremental_records(start_timestamp, end_timestamp),
+            )
+            .map_err(PythonError::from)
+        })?
         .to_pyarrow(py)
         .map(|b| b.unbind())
     }
@@ -589,10 +632,12 @@ impl HudiTimeline {
     }
 
     pub fn get_latest_schema(&self, py: Python) -> PyResult<Py<PyAny>> {
-        rt().block_on(self.inner.get_latest_schema())
-            .map_err(PythonError::from)?
-            .to_pyarrow(py)
-            .map(|b| b.unbind())
+        py.detach(|| {
+            rt().block_on(self.inner.get_latest_schema())
+                .map_err(PythonError::from)
+        })?
+        .to_pyarrow(py)
+        .map(|b| b.unbind())
     }
 }
 
@@ -608,20 +653,22 @@ impl From<&Timeline> for HudiTimeline {
 #[pyfunction]
 #[pyo3(signature = (base_uri, hudi_options=None, storage_options=None, 
options=None))]
 pub fn build_hudi_table(
+    py: Python,
     base_uri: String,
     hudi_options: Option<HashMap<String, String>>,
     storage_options: Option<HashMap<String, String>>,
     options: Option<HashMap<String, String>>,
 ) -> PyResult<HudiTable> {
-    let inner = rt()
-        .block_on(
+    let inner = py.detach(|| {
+        rt().block_on(
             TableBuilder::from_base_uri(&base_uri)
                 .with_hudi_options(hudi_options.unwrap_or_default())
                 .with_storage_options(storage_options.unwrap_or_default())
                 .with_options(options.unwrap_or_default())
                 .build(),
         )
-        .map_err(PythonError::from)?;
+        .map_err(PythonError::from)
+    })?;
     Ok(HudiTable { inner })
 }
 
diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py
index 3a52fbb..6702b6f 100644
--- a/python/tests/test_table_read.py
+++ b/python/tests/test_table_read.py
@@ -32,7 +32,7 @@ from hudi import HudiTable
 def test_read_table_has_correct_schema(v8_trips_table):
     table = HudiTable(v8_trips_table)
 
-    assert table.get_schema().names == [
+    assert table.get_schema_with_meta_fields().names == [
         "_hoodie_commit_time",
         "_hoodie_commit_seqno",
         "_hoodie_record_key",


Reply via email to