This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c98b09  feat: add blocking APIs for `Table` and `FileGroupReader` 
(#321)
6c98b09 is described below

commit 6c98b0931ef4f06278fa2c69af94f4616f5e3518
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Apr 27 00:57:47 2025 -0500

    feat: add blocking APIs for `Table` and `FileGroupReader` (#321)
    
    Add blocking version of `Table` and `FileGroupReader` APIs in Rust, with 
some minor refactoring on API args.
---
 crates/core/src/config/util.rs       |   5 +
 crates/core/src/expr/filter.rs       |  11 +-
 crates/core/src/file_group/mod.rs    |  25 +-
 crates/core/src/file_group/reader.rs |  27 +-
 crates/core/src/table/mod.rs         | 513 +++++++++++++++++++++++------------
 crates/core/src/util/mod.rs          |  39 ---
 crates/datafusion/src/lib.rs         |   3 +-
 demo/table-api-rust/src/main.rs      |   3 +-
 python/src/internal.rs               |  56 ++--
 9 files changed, 423 insertions(+), 259 deletions(-)

diff --git a/crates/core/src/config/util.rs b/crates/core/src/config/util.rs
index 33921c6..44879b5 100644
--- a/crates/core/src/config/util.rs
+++ b/crates/core/src/config/util.rs
@@ -29,6 +29,11 @@ pub fn empty_options<'a>() -> std::iter::Empty<(&'a str, &'a 
str)> {
     std::iter::empty::<(&str, &str)>()
 }
 
+/// Returns an empty iterator to represent an empty set of filters.
+pub fn empty_filters<'a>() -> std::iter::Empty<(&'a str, &'a str, &'a str)> {
+    std::iter::empty::<(&str, &str, &str)>()
+}
+
 /// Splits the given options into two maps: one for Hudi options, and the 
other for others, which could be storage options for example.
 pub fn split_hudi_options_from_others<I, K, V>(
     all_options: I,
diff --git a/crates/core/src/expr/filter.rs b/crates/core/src/expr/filter.rs
index 847a2f5..b9cb7a6 100644
--- a/crates/core/src/expr/filter.rs
+++ b/crates/core/src/expr/filter.rs
@@ -72,8 +72,15 @@ impl TryFrom<(&str, &str, &str)> for Filter {
     }
 }
 
-pub fn from_str_tuples(tuples: &[(&str, &str, &str)]) -> Result<Vec<Filter>> {
-    tuples.iter().map(|t| Filter::try_from(*t)).collect()
+pub fn from_str_tuples<I, S>(tuples: I) -> Result<Vec<Filter>>
+where
+    I: IntoIterator<Item = (S, S, S)>,
+    S: AsRef<str>,
+{
+    tuples
+        .into_iter()
+        .map(|t| Filter::try_from((t.0.as_ref(), t.1.as_ref(), t.2.as_ref())))
+        .collect()
 }
 
 pub struct FilterField {
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index cff9a78..cf5a4ae 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -87,13 +87,11 @@ impl FileGroup {
     }
 
     /// Create a new [FileGroup] with a [BaseFile]'s file name.
-    pub fn new_with_base_file_name(
-        id: String,
-        partition_path: String,
-        file_name: &str,
-    ) -> Result<Self> {
-        let mut file_group = Self::new(id, partition_path);
-        file_group.add_base_file_from_name(file_name)?;
+    pub fn new_with_base_file_name(file_name: &str, partition_path: &str) -> 
Result<Self> {
+        let base_file = BaseFile::from_str(file_name)?;
+        let file_id = base_file.file_id.clone();
+        let mut file_group = Self::new(file_id, partition_path.to_string());
+        file_group.add_base_file(base_file)?;
         Ok(file_group)
     }
 
@@ -156,6 +154,19 @@ impl FileGroup {
         self.add_log_file(log_file)
     }
 
+    /// Add multiple [LogFile]s based on the file names to the corresponding 
[FileSlice]s in the
+    /// [FileGroup].
+    pub fn add_log_files_from_names<I, S>(&mut self, log_file_names: I) -> 
Result<&Self>
+    where
+        I: IntoIterator<Item = S>,
+        S: AsRef<str>,
+    {
+        for file_name in log_file_names {
+            self.add_log_file_from_name(file_name.as_ref())?;
+        }
+        Ok(self)
+    }
+
     /// Add a [LogFile] to the corresponding [FileSlice] in the [FileGroup].
     ///
     /// TODO: support adding log files to file group without base files.
diff --git a/crates/core/src/file_group/reader.rs 
b/crates/core/src/file_group/reader.rs
index 69f9cf4..5d79ce8 100644
--- a/crates/core/src/file_group/reader.rs
+++ b/crates/core/src/file_group/reader.rs
@@ -168,6 +168,17 @@ impl FileGroupReader {
         }
     }
 
+    /// Same as [FileGroupReader::read_file_slice_by_base_file_path], but 
blocking.
+    pub fn read_file_slice_by_base_file_path_blocking(
+        &self,
+        relative_path: &str,
+    ) -> Result<RecordBatch> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(self.read_file_slice_by_base_file_path(relative_path))
+    }
+
     fn create_instant_range_for_log_file_scan(&self) -> InstantRange {
         let timezone = self
             .hudi_configs
@@ -225,6 +236,14 @@ impl FileGroupReader {
             merger.merge_record_batches(&schema, &all_record_batches)
         }
     }
+
+    /// Same as [FileGroupReader::read_file_slice], but blocking.
+    pub fn read_file_slice_blocking(&self, file_slice: &FileSlice) -> 
Result<RecordBatch> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(self.read_file_slice(file_slice))
+    }
 }
 
 #[cfg(test)]
@@ -249,14 +268,12 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_read_file_slice_returns_error() {
+    #[test]
+    fn test_read_file_slice_returns_error() {
         let reader =
             
FileGroupReader::new_with_options("file:///non-existent-path/table", 
empty_options())
                 .unwrap();
-        let result = reader
-            .read_file_slice_by_base_file_path("non_existent_file")
-            .await;
+        let result = 
reader.read_file_slice_by_base_file_path_blocking("non_existent_file");
         assert!(matches!(result.unwrap_err(), ReadFileSliceError(_)));
     }
 
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index e47ff9b..acb5286 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -47,18 +47,20 @@
 //! 3. read hudi table
 //! ```rust
 //! use url::Url;
+//! use hudi_core::config::util::empty_filters;
 //! use hudi_core::table::Table;
 //!
 //! pub async fn test() {
 //!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
 //!     let hudi_table = Table::new(base_uri.path()).await.unwrap();
-//!     let record_read = hudi_table.read_snapshot(&[]).await.unwrap();
+//!     let record_read = 
hudi_table.read_snapshot(empty_filters()).await.unwrap();
 //! }
 //! ```
 //! 4. get file slice
 //!    Users can obtain metadata to customize reading methods, read in 
batches, perform parallel reads, and more.
 //! ```rust
 //! use url::Url;
+//! use hudi_core::config::util::empty_filters;
 //! use hudi_core::table::Table;
 //! use hudi_core::storage::util::parse_uri;
 //! use hudi_core::storage::util::join_url_segments;
@@ -67,7 +69,7 @@
 //!     let base_uri = Url::from_file_path("/tmp/hudi_data").unwrap();
 //!     let hudi_table = Table::new(base_uri.path()).await.unwrap();
 //!     let file_slices = hudi_table
-//!             .get_file_slices_splits(2, &[])
+//!             .get_file_slices_splits(2, empty_filters())
 //!             .await.unwrap();
 //!     // define every parquet task reader how many slice
 //!     let mut parquet_file_groups: Vec<Vec<String>> = Vec::new();
@@ -124,6 +126,14 @@ impl Table {
         TableBuilder::from_base_uri(base_uri).build().await
     }
 
+    /// Same as [Table::new], but blocking.
+    pub fn new_blocking(base_uri: &str) -> Result<Self> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { Table::new(base_uri).await })
+    }
+
     /// Create hudi table with options
     pub async fn new_with_options<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
     where
@@ -137,6 +147,19 @@ impl Table {
             .await
     }
 
+    /// Same as [Table::new_with_options], but blocking.
+    pub fn new_with_options_blocking<I, K, V>(base_uri: &str, options: I) -> 
Result<Self>
+    where
+        I: IntoIterator<Item = (K, V)>,
+        K: AsRef<str>,
+        V: Into<String>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { Table::new_with_options(base_uri, options).await 
})
+    }
+
     pub fn hudi_options(&self) -> HashMap<String, String> {
         self.hudi_configs.as_options()
     }
@@ -198,11 +221,27 @@ impl Table {
         self.timeline.get_latest_avro_schema().await
     }
 
+    /// Same as [Table::get_avro_schema], but blocking.
+    pub fn get_avro_schema_blocking(&self) -> Result<String> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.get_avro_schema().await })
+    }
+
     /// Get the latest [arrow_schema::Schema] of the table.
     pub async fn get_schema(&self) -> Result<Schema> {
         self.timeline.get_latest_schema().await
     }
 
+    /// Same as [Table::get_schema], but blocking.
+    pub fn get_schema_blocking(&self) -> Result<Schema> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.get_schema().await })
+    }
+
     /// Get the latest partition [arrow_schema::Schema] of the table.
     pub async fn get_partition_schema(&self) -> Result<Schema> {
         let partition_fields: HashSet<String> = self
@@ -223,6 +262,14 @@ impl Table {
         Ok(Schema::new(partition_fields))
     }
 
+    /// Same as [Table::get_partition_schema], but blocking.
+    pub fn get_partition_schema_blocking(&self) -> Result<Schema> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.get_partition_schema().await })
+    }
+
     /// Get the [Timeline] of the table.
     pub fn get_timeline(&self) -> &Timeline {
         &self.timeline
@@ -233,11 +280,15 @@ impl Table {
     /// # Arguments
     ///     * `n` - The number of chunks to split the file slices into.
     ///     * `filters` - Partition filters to apply.
-    pub async fn get_file_slices_splits(
+    pub async fn get_file_slices_splits<I, S>(
         &self,
         n: usize,
-        filters: &[(&str, &str, &str)],
-    ) -> Result<Vec<Vec<FileSlice>>> {
+        filters: I,
+    ) -> Result<Vec<Vec<FileSlice>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
         if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
             let filters = from_str_tuples(filters)?;
             self.get_file_slices_splits_internal(n, timestamp, &filters)
@@ -247,23 +298,63 @@ impl Table {
         }
     }
 
+    /// Same as [Table::get_file_slices_splits], but blocking.
+    pub fn get_file_slices_splits_blocking<I, S>(
+        &self,
+        n: usize,
+        filters: I,
+    ) -> Result<Vec<Vec<FileSlice>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.get_file_slices_splits(n, filters).await })
+    }
+
     /// Get all the [FileSlice]s in splits from the table at a given timestamp.
     ///
     /// # Arguments
     ///     * `n` - The number of chunks to split the file slices into.
     ///     * `timestamp` - The timestamp which file slices associated with.
     ///     * `filters` - Partition filters to apply.
-    pub async fn get_file_slices_splits_as_of(
+    pub async fn get_file_slices_splits_as_of<I, S>(
         &self,
         n: usize,
         timestamp: &str,
-        filters: &[(&str, &str, &str)],
-    ) -> Result<Vec<Vec<FileSlice>>> {
+        filters: I,
+    ) -> Result<Vec<Vec<FileSlice>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
         let filters = from_str_tuples(filters)?;
         self.get_file_slices_splits_internal(n, timestamp, &filters)
             .await
     }
 
+    /// Same as [Table::get_file_slices_splits_as_of], but blocking.
+    pub fn get_file_slices_splits_as_of_blocking<I, S>(
+        &self,
+        n: usize,
+        timestamp: &str,
+        filters: I,
+    ) -> Result<Vec<Vec<FileSlice>>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async {
+                self.get_file_slices_splits_as_of(n, timestamp, filters)
+                    .await
+            })
+    }
+
     async fn get_file_slices_splits_internal(
         &self,
         n: usize,
@@ -291,7 +382,11 @@ impl Table {
     ///
     /// # Notes
     ///     * This API is useful for implementing snapshot query.
-    pub async fn get_file_slices(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<FileSlice>> {
+    pub async fn get_file_slices<I, S>(&self, filters: I) -> 
Result<Vec<FileSlice>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
         if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
             let filters = from_str_tuples(filters)?;
             self.get_file_slices_internal(timestamp, &filters).await
@@ -300,6 +395,18 @@ impl Table {
         }
     }
 
+    /// Same as [Table::get_file_slices], but blocking.
+    pub fn get_file_slices_blocking<I, S>(&self, filters: I) -> 
Result<Vec<FileSlice>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.get_file_slices(filters).await })
+    }
+
     /// Get all the [FileSlice]s in the table at a given timestamp.
     ///
     /// # Arguments
@@ -308,15 +415,35 @@ impl Table {
     ///
     /// # Notes
     ///     * This API is useful for implementing time travel query.
-    pub async fn get_file_slices_as_of(
+    pub async fn get_file_slices_as_of<I, S>(
         &self,
         timestamp: &str,
-        filters: &[(&str, &str, &str)],
-    ) -> Result<Vec<FileSlice>> {
+        filters: I,
+    ) -> Result<Vec<FileSlice>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
         let filters = from_str_tuples(filters)?;
         self.get_file_slices_internal(timestamp, &filters).await
     }
 
+    /// Same as [Table::get_file_slices_as_of], but blocking.
+    pub fn get_file_slices_as_of_blocking<I, S>(
+        &self,
+        timestamp: &str,
+        filters: I,
+    ) -> Result<Vec<FileSlice>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.get_file_slices_as_of(timestamp, 
filters).await })
+    }
+
     async fn get_file_slices_internal(
         &self,
         timestamp: &str,
@@ -360,6 +487,21 @@ impl Table {
         self.get_file_slices_between_internal(start, end).await
     }
 
+    /// Same as [Table::get_file_slices_between], but blocking.
+    pub fn get_file_slices_between_blocking(
+        &self,
+        start_timestamp: Option<&str>,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<FileSlice>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async {
+                self.get_file_slices_between(start_timestamp, end_timestamp)
+                    .await
+            })
+    }
+
     async fn get_file_slices_between_internal(
         &self,
         start_timestamp: &str,
@@ -406,7 +548,11 @@ impl Table {
     ///
     /// # Arguments
     ///     * `filters` - Partition filters to apply.
-    pub async fn read_snapshot(&self, filters: &[(&str, &str, &str)]) -> 
Result<Vec<RecordBatch>> {
+    pub async fn read_snapshot<I, S>(&self, filters: I) -> 
Result<Vec<RecordBatch>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
         if let Some(timestamp) = 
self.timeline.get_latest_commit_timestamp_as_option() {
             let filters = from_str_tuples(filters)?;
             self.read_snapshot_internal(timestamp, &filters).await
@@ -415,20 +561,52 @@ impl Table {
         }
     }
 
+    /// Same as [Table::read_snapshot], but blocking.
+    pub fn read_snapshot_blocking<I, S>(&self, filters: I) -> 
Result<Vec<RecordBatch>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.read_snapshot(filters).await })
+    }
+
     /// Get all the records in the table at a given timestamp.
     ///
     /// # Arguments
     ///     * `timestamp` - The timestamp which records associated with.
     ///     * `filters` - Partition filters to apply.
-    pub async fn read_snapshot_as_of(
+    pub async fn read_snapshot_as_of<I, S>(
         &self,
         timestamp: &str,
-        filters: &[(&str, &str, &str)],
-    ) -> Result<Vec<RecordBatch>> {
+        filters: I,
+    ) -> Result<Vec<RecordBatch>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
         let filters = from_str_tuples(filters)?;
         self.read_snapshot_internal(timestamp, &filters).await
     }
 
+    /// Same as [Table::read_snapshot_as_of], but blocking.
+    pub fn read_snapshot_as_of_blocking<I, S>(
+        &self,
+        timestamp: &str,
+        filters: I,
+    ) -> Result<Vec<RecordBatch>>
+    where
+        I: IntoIterator<Item = (S, S, S)>,
+        S: AsRef<str>,
+    {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async { self.read_snapshot_as_of(timestamp, 
filters).await })
+    }
+
     async fn read_snapshot_internal(
         &self,
         timestamp: &str,
@@ -480,6 +658,21 @@ impl Table {
         Ok(batches)
     }
 
+    /// Same as [Table::read_incremental_records], but blocking.
+    pub fn read_incremental_records_blocking(
+        &self,
+        start_timestamp: &str,
+        end_timestamp: Option<&str>,
+    ) -> Result<Vec<RecordBatch>> {
+        tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()?
+            .block_on(async {
+                self.read_incremental_records(start_timestamp, end_timestamp)
+                    .await
+            })
+    }
+
     /// Get the change-data-capture (CDC) records between the given timestamps.
     ///
     /// The CDC records should reflect the records that were inserted, 
updated, and deleted
@@ -503,7 +696,7 @@ mod tests {
         PrecombineField, RecordKeyFields, TableName, TableType, TableVersion,
         TimelineLayoutVersion, TimelineTimezone,
     };
-    use crate::config::util::empty_options;
+    use crate::config::util::{empty_filters, empty_options};
     use crate::config::HUDI_CONF_DIR;
     use crate::storage::util::join_url_segments;
     use crate::storage::Storage;
@@ -518,27 +711,26 @@ mod tests {
     /// # Arguments
     ///
     /// * `table_dir_name` - Name of the table root directory; all under 
`crates/core/tests/data/`.
-    async fn get_test_table_without_validation(table_dir_name: &str) -> Table {
+    fn get_test_table_without_validation(table_dir_name: &str) -> Table {
         let base_url = Url::from_file_path(
             
canonicalize(PathBuf::from("tests").join("data").join(table_dir_name)).unwrap(),
         )
         .unwrap();
-        Table::new_with_options(
+        Table::new_with_options_blocking(
             base_url.as_str(),
             [("hoodie.internal.skip.config.validation", "true")],
         )
-        .await
         .unwrap()
     }
 
     /// Test helper to get relative file paths from the table with filters.
-    async fn get_file_paths_with_filters(
+    fn get_file_paths_with_filters(
         table: &Table,
         filters: &[(&str, &str, &str)],
     ) -> Result<Vec<String>> {
         let mut file_paths = Vec::new();
         let base_url = table.base_url();
-        for f in table.get_file_slices(filters).await? {
+        for f in table.get_file_slices_blocking(filters.to_vec())? {
             let relative_path = f.base_file_relative_path()?;
             let file_url = join_url_segments(&base_url, 
&[relative_path.as_str()])?;
             file_paths.push(file_url.to_string());
@@ -546,10 +738,10 @@ mod tests {
         Ok(file_paths)
     }
 
-    #[tokio::test]
-    async fn test_hudi_table_get_hudi_options() {
+    #[test]
+    fn test_hudi_table_get_hudi_options() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         let hudi_options = hudi_table.hudi_options();
         for (k, v) in hudi_options.iter() {
             assert!(k.starts_with("hoodie."));
@@ -557,10 +749,10 @@ mod tests {
         }
     }
 
-    #[tokio::test]
-    async fn test_hudi_table_get_storage_options() {
+    #[test]
+    fn test_hudi_table_get_storage_options() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
 
         let cloud_prefixes: HashSet<_> = Storage::CLOUD_STORAGE_PREFIXES
             .iter()
@@ -584,13 +776,12 @@ mod tests {
         }
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_schema() {
+    #[test]
+    fn hudi_table_get_schema() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         let fields: Vec<String> = hudi_table
-            .get_schema()
-            .await
+            .get_schema_blocking()
             .unwrap()
             .flattened_fields()
             .into_iter()
@@ -637,13 +828,12 @@ mod tests {
         );
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_partition_schema() {
+    #[test]
+    fn hudi_table_get_partition_schema() {
         let base_url = SampleTable::V6TimebasedkeygenNonhivestyle.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         let fields: Vec<String> = hudi_table
-            .get_partition_schema()
-            .await
+            .get_partition_schema_blocking()
             .unwrap()
             .flattened_fields()
             .into_iter()
@@ -652,9 +842,9 @@ mod tests {
         assert_eq!(fields, vec!["ts_str"]);
     }
 
-    #[tokio::test]
-    async fn validate_invalid_table_props() {
-        let table = 
get_test_table_without_validation("table_props_invalid").await;
+    #[test]
+    fn validate_invalid_table_props() {
+        let table = get_test_table_without_validation("table_props_invalid");
         let configs = table.hudi_configs;
         assert!(
             configs.validate(BaseFileFormat).is_err(),
@@ -704,9 +894,9 @@ mod tests {
         );
     }
 
-    #[tokio::test]
-    async fn get_invalid_table_props() {
-        let table = 
get_test_table_without_validation("table_props_invalid").await;
+    #[test]
+    fn get_invalid_table_props() {
+        let table = get_test_table_without_validation("table_props_invalid");
         let configs = table.hudi_configs;
         assert!(configs.get(BaseFileFormat).is_err());
         assert!(configs.get(Checksum).is_err());
@@ -726,9 +916,9 @@ mod tests {
         assert!(configs.get(TimelineTimezone).is_err());
     }
 
-    #[tokio::test]
-    async fn get_default_for_invalid_table_props() {
-        let table = 
get_test_table_without_validation("table_props_invalid").await;
+    #[test]
+    fn get_default_for_invalid_table_props() {
+        let table = get_test_table_without_validation("table_props_invalid");
         let configs = table.hudi_configs;
         assert_eq!(
             configs.get_or_default(BaseFileFormat).to::<String>(),
@@ -763,9 +953,9 @@ mod tests {
         );
     }
 
-    #[tokio::test]
-    async fn get_valid_table_props() {
-        let table = 
get_test_table_without_validation("table_props_valid").await;
+    #[test]
+    fn get_valid_table_props() {
+        let table = get_test_table_without_validation("table_props_valid");
         let configs = table.hudi_configs;
         assert_eq!(
             configs.get(BaseFileFormat).unwrap().to::<String>(),
@@ -803,10 +993,10 @@ mod tests {
         );
     }
 
-    #[tokio::test]
-    async fn get_global_table_props() {
+    #[test]
+    fn get_global_table_props() {
         // Without the environment variable HUDI_CONF_DIR
-        let table = 
get_test_table_without_validation("table_props_partial").await;
+        let table = get_test_table_without_validation("table_props_partial");
         let configs = table.hudi_configs;
         assert!(configs.get(DatabaseName).is_err());
         assert!(configs.get(TableType).is_err());
@@ -816,7 +1006,7 @@ mod tests {
         let base_path = env::current_dir().unwrap();
         let hudi_conf_dir = base_path.join("random/wrong/dir");
         env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
-        let table = 
get_test_table_without_validation("table_props_partial").await;
+        let table = get_test_table_without_validation("table_props_partial");
         let configs = table.hudi_configs;
         assert!(configs.get(DatabaseName).is_err());
         assert!(configs.get(TableType).is_err());
@@ -826,7 +1016,7 @@ mod tests {
         let base_path = env::current_dir().unwrap();
         let hudi_conf_dir = base_path.join("tests/data/hudi_conf_dir");
         env::set_var(HUDI_CONF_DIR, hudi_conf_dir.as_os_str());
-        let table = 
get_test_table_without_validation("table_props_partial").await;
+        let table = get_test_table_without_validation("table_props_partial");
         let configs = table.hudi_configs;
         assert_eq!(configs.get(DatabaseName).unwrap().to::<String>(), "tmpdb");
         assert_eq!(
@@ -837,52 +1027,54 @@ mod tests {
         env::remove_var(HUDI_CONF_DIR)
     }
 
-    #[tokio::test]
-    async fn hudi_table_read_file_slice() {
+    #[test]
+    fn hudi_table_read_file_slice() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         let batches = hudi_table
             .create_file_group_reader_with_options(empty_options())
             .unwrap()
-            .read_file_slice_by_base_file_path(
+            .read_file_slice_by_base_file_path_blocking(
                 
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
             )
-            .await
             .unwrap();
         assert_eq!(batches.num_rows(), 4);
         assert_eq!(batches.num_columns(), 21);
     }
 
-    #[tokio::test]
-    async fn empty_hudi_table_get_file_slices_splits() {
+    #[test]
+    fn empty_hudi_table_get_file_slices_splits() {
         let base_url = SampleTable::V6Empty.url_to_cow();
 
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let file_slices_splits = hudi_table.get_file_slices_splits(2, 
&[]).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_blocking(2, empty_filters())
+            .unwrap();
         assert!(file_slices_splits.is_empty());
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_file_slices_splits() {
+    #[test]
+    fn hudi_table_get_file_slices_splits() {
         let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
 
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let file_slices_splits = hudi_table.get_file_slices_splits(2, 
&[]).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let file_slices_splits = hudi_table
+            .get_file_slices_splits_blocking(2, empty_filters())
+            .unwrap();
         assert_eq!(file_slices_splits.len(), 2);
         assert_eq!(file_slices_splits[0].len(), 2);
         assert_eq!(file_slices_splits[1].len(), 1);
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_file_slices_splits_as_of_timestamps() {
+    #[test]
+    fn hudi_table_get_file_slices_splits_as_of_timestamps() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
 
         // before replacecommit (insert overwrite table)
         let second_latest_timestamp = "20250121000656060";
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_as_of(2, second_latest_timestamp, &[])
-            .await
+            .get_file_slices_splits_as_of_blocking(2, second_latest_timestamp, 
empty_filters())
             .unwrap();
         assert_eq!(file_slices_splits.len(), 2);
         assert_eq!(file_slices_splits[0].len(), 2);
@@ -915,19 +1107,20 @@ mod tests {
         // as of replacecommit (insert overwrite table)
         let latest_timestamp = "20250121000702475";
         let file_slices_splits = hudi_table
-            .get_file_slices_splits_as_of(2, latest_timestamp, &[])
-            .await
+            .get_file_slices_splits_as_of_blocking(2, latest_timestamp, 
empty_filters())
             .unwrap();
         assert_eq!(file_slices_splits.len(), 1);
         assert_eq!(file_slices_splits[0].len(), 1);
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_file_slices_as_of_timestamps() {
+    #[test]
+    fn hudi_table_get_file_slices_as_of_timestamps() {
         let base_url = SampleTable::V6Nonpartitioned.url_to_cow();
 
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
-        let file_slices = hudi_table.get_file_slices(&[]).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
+        let file_slices = hudi_table
+            .get_file_slices_blocking(empty_filters())
+            .unwrap();
         assert_eq!(
             file_slices
                 .iter()
@@ -937,10 +1130,9 @@ mod tests {
         );
 
         // as of the latest timestamp
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         let file_slices = hudi_table
-            .get_file_slices_as_of("20240418173551906", &[])
-            .await
+            .get_file_slices_as_of_blocking("20240418173551906", 
empty_filters())
             .unwrap();
         assert_eq!(
             file_slices
@@ -951,12 +1143,10 @@ mod tests {
         );
 
         // as of just smaller than the latest timestamp
-        let hudi_table = Table::new_with_options(base_url.path(), 
empty_options())
-            .await
-            .unwrap();
+        let hudi_table =
+            Table::new_with_options_blocking(base_url.path(), 
empty_options()).unwrap();
         let file_slices = hudi_table
-            .get_file_slices_as_of("20240418173551905", &[])
-            .await
+            .get_file_slices_as_of_blocking("20240418173551905", 
empty_filters())
             .unwrap();
         assert_eq!(
             file_slices
@@ -967,12 +1157,10 @@ mod tests {
         );
 
         // as of non-exist old timestamp
-        let hudi_table = Table::new_with_options(base_url.path(), 
empty_options())
-            .await
-            .unwrap();
+        let hudi_table =
+            Table::new_with_options_blocking(base_url.path(), 
empty_options()).unwrap();
         let file_slices = hudi_table
-            .get_file_slices_as_of("19700101000000", &[])
-            .await
+            .get_file_slices_as_of_blocking("19700101000000", empty_filters())
             .unwrap();
         assert_eq!(
             file_slices
@@ -983,24 +1171,22 @@ mod tests {
         );
     }
 
-    #[tokio::test]
-    async fn empty_hudi_table_get_file_slices_between_timestamps() {
+    #[test]
+    fn empty_hudi_table_get_file_slices_between_timestamps() {
         let base_url = SampleTable::V6Empty.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         let file_slices = hudi_table
-            .get_file_slices_between(Some(EARLIEST_START_TIMESTAMP), None)
-            .await
+            .get_file_slices_between_blocking(Some(EARLIEST_START_TIMESTAMP), 
None)
             .unwrap();
         assert!(file_slices.is_empty())
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_file_slices_between_timestamps() {
+    #[test]
+    fn hudi_table_get_file_slices_between_timestamps() {
         let base_url = 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.url_to_mor_parquet();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         let mut file_slices = hudi_table
-            .get_file_slices_between(None, Some("20250121000656060"))
-            .await
+            .get_file_slices_between_blocking(None, Some("20250121000656060"))
             .unwrap();
         assert_eq!(file_slices.len(), 3);
 
@@ -1031,15 +1217,14 @@ mod tests {
         assert!(file_slice_2.log_files.is_empty());
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
+    #[test]
+    fn hudi_table_get_file_paths_for_simple_keygen_non_hive_style() {
         let base_url = SampleTable::V6SimplekeygenNonhivestyle.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
 
         let partition_filters = &[];
         let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
-            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1055,7 +1240,6 @@ mod tests {
 
         let filters = [("byteField", ">=", "10"), ("byteField", "<", "30")];
         let actual = get_file_paths_with_filters(&hudi_table, &filters)
-            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1069,7 +1253,6 @@ mod tests {
         assert_eq!(actual, expected);
 
         let actual = get_file_paths_with_filters(&hudi_table, &[("byteField", 
">", "30")])
-            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1077,15 +1260,14 @@ mod tests {
         assert_eq!(actual, expected);
     }
 
-    #[tokio::test]
-    async fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
+    #[test]
+    fn hudi_table_get_file_paths_for_complex_keygen_hive_style() {
         let base_url = SampleTable::V6ComplexkeygenHivestyle.url_to_cow();
-        let hudi_table = Table::new(base_url.path()).await.unwrap();
+        let hudi_table = Table::new_blocking(base_url.path()).unwrap();
         assert_eq!(hudi_table.timeline.completed_commits.len(), 2);
 
         let partition_filters = &[];
         let actual = get_file_paths_with_filters(&hudi_table, 
partition_filters)
-            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1105,7 +1287,6 @@ mod tests {
             ("shortField", "!=", "100"),
         ];
         let actual = get_file_paths_with_filters(&hudi_table, &filters)
-            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1119,7 +1300,6 @@ mod tests {
 
         let filters = [("byteField", ">=", "20"), ("shortField", "=", "300")];
         let actual = get_file_paths_with_filters(&hudi_table, &filters)
-            .await
             .unwrap()
             .into_iter()
             .collect::<HashSet<_>>();
@@ -1129,28 +1309,29 @@ mod tests {
 
     mod test_snapshot_and_time_travel_queries {
         use super::super::*;
+        use crate::config::util::empty_filters;
         use arrow::compute::concat_batches;
         use hudi_test::{QuickstartTripsTable, SampleTable};
 
-        #[tokio::test]
-        async fn test_empty() -> Result<()> {
+        #[test]
+        fn test_empty() -> Result<()> {
             for base_url in SampleTable::V6Empty.urls() {
-                let hudi_table = Table::new(base_url.path()).await?;
-                let records = hudi_table.read_snapshot(&[]).await?;
+                let hudi_table = Table::new_blocking(base_url.path())?;
+                let records = 
hudi_table.read_snapshot_blocking(empty_filters())?;
                 assert!(records.is_empty());
             }
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_quickstart_trips_table() -> Result<()> {
+        #[test]
+        fn test_quickstart_trips_table() -> Result<()> {
             let base_url = QuickstartTripsTable::V6Trips8I1U.url_to_mor_avro();
-            let hudi_table = Table::new(base_url.path()).await?;
+            let hudi_table = Table::new_blocking(base_url.path())?;
 
             let updated_rider = "rider-D";
 
             // verify updated record as of the latest commit
-            let records = hudi_table.read_snapshot(&[]).await?;
+            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -1172,7 +1353,7 @@ mod tests {
                 .map(|i| i.timestamp.as_str())
                 .collect::<Vec<_>>();
             let first_commit = commit_timestamps[0];
-            let records = hudi_table.read_snapshot_as_of(first_commit, 
&[]).await?;
+            let records = 
hudi_table.read_snapshot_as_of_blocking(first_commit, empty_filters())?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
             let uuid_rider_and_fare = 
QuickstartTripsTable::uuid_rider_and_fare(&records)
@@ -1189,11 +1370,11 @@ mod tests {
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_non_partitioned() -> Result<()> {
+        #[test]
+        fn test_non_partitioned() -> Result<()> {
             for base_url in SampleTable::V6Nonpartitioned.urls() {
-                let hudi_table = Table::new(base_url.path()).await?;
-                let records = hudi_table.read_snapshot(&[]).await?;
+                let hudi_table = Table::new_blocking(base_url.path())?;
+                let records = 
hudi_table.read_snapshot_blocking(empty_filters())?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -1211,14 +1392,13 @@ mod tests {
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_non_partitioned_read_optimized() -> Result<()> {
+        #[test]
+        fn test_non_partitioned_read_optimized() -> Result<()> {
             let base_url = SampleTable::V6Nonpartitioned.url_to_mor_parquet();
-            let hudi_table = Table::new_with_options(
+            let hudi_table = Table::new_with_options_blocking(
                 base_url.path(),
                 [(HudiReadConfig::UseReadOptimizedMode.as_ref(), "true")],
-            )
-            .await?;
+            )?;
             let commit_timestamps = hudi_table
                 .timeline
                 .completed_commits
@@ -1226,7 +1406,8 @@ mod tests {
                 .map(|i| i.timestamp.as_str())
                 .collect::<Vec<_>>();
             let latest_commit = commit_timestamps.last().unwrap();
-            let records = hudi_table.read_snapshot_as_of(latest_commit, 
&[]).await?;
+            let records =
+                hudi_table.read_snapshot_as_of_blocking(latest_commit, 
empty_filters())?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -1243,11 +1424,11 @@ mod tests {
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_non_partitioned_rollback() -> Result<()> {
+        #[test]
+        fn test_non_partitioned_rollback() -> Result<()> {
             let base_url = 
SampleTable::V6NonpartitionedRollback.url_to_mor_parquet();
-            let hudi_table = Table::new(base_url.path()).await?;
-            let records = hudi_table.read_snapshot(&[]).await?;
+            let hudi_table = Table::new_blocking(base_url.path())?;
+            let records = hudi_table.read_snapshot_blocking(empty_filters())?;
             let schema = &records[0].schema();
             let records = concat_batches(schema, &records)?;
 
@@ -1263,17 +1444,17 @@ mod tests {
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
+        #[test]
+        fn test_complex_keygen_hive_style_with_filters() -> Result<()> {
             for base_url in SampleTable::V6ComplexkeygenHivestyle.urls() {
-                let hudi_table = Table::new(base_url.path()).await?;
+                let hudi_table = Table::new_blocking(base_url.path())?;
 
-                let filters = &[
+                let filters = vec![
                     ("byteField", ">=", "10"),
                     ("byteField", "<", "20"),
                     ("shortField", "!=", "100"),
                 ];
-                let records = hudi_table.read_snapshot(filters).await?;
+                let records = hudi_table.read_snapshot_blocking(filters)?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -1283,10 +1464,10 @@ mod tests {
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
+        #[test]
+        fn test_simple_keygen_nonhivestyle_time_travel() -> Result<()> {
             for base_url in SampleTable::V6SimplekeygenNonhivestyle.urls() {
-                let hudi_table = Table::new(base_url.path()).await?;
+                let hudi_table = Table::new_blocking(base_url.path())?;
                 let commit_timestamps = hudi_table
                     .timeline
                     .completed_commits
@@ -1294,7 +1475,8 @@ mod tests {
                     .map(|i| i.timestamp.as_str())
                     .collect::<Vec<_>>();
                 let first_commit = commit_timestamps[0];
-                let records = hudi_table.read_snapshot_as_of(first_commit, 
&[]).await?;
+                let records =
+                    hudi_table.read_snapshot_as_of_blocking(first_commit, 
empty_filters())?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -1307,11 +1489,11 @@ mod tests {
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
+        #[test]
+        fn test_simple_keygen_hivestyle_no_metafields() -> Result<()> {
             for base_url in 
SampleTable::V6SimplekeygenHivestyleNoMetafields.urls() {
-                let hudi_table = Table::new(base_url.path()).await?;
-                let records = hudi_table.read_snapshot(&[]).await?;
+                let hudi_table = Table::new_blocking(base_url.path())?;
+                let records = 
hudi_table.read_snapshot_blocking(empty_filters())?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
 
@@ -1335,20 +1517,20 @@ mod tests {
         use arrow_select::concat::concat_batches;
         use hudi_test::SampleTable;
 
-        #[tokio::test]
-        async fn test_empty() -> Result<()> {
+        #[test]
+        fn test_empty() -> Result<()> {
             for base_url in SampleTable::V6Empty.urls() {
-                let hudi_table = Table::new(base_url.path()).await?;
-                let records = hudi_table.read_incremental_records("0", 
None).await?;
+                let hudi_table = Table::new_blocking(base_url.path())?;
+                let records = 
hudi_table.read_incremental_records_blocking("0", None)?;
                 assert!(records.is_empty())
             }
             Ok(())
         }
 
-        #[tokio::test]
-        async fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> 
{
+        #[test]
+        fn test_simplekeygen_nonhivestyle_overwritetable() -> Result<()> {
             for base_url in 
SampleTable::V6SimplekeygenNonhivestyleOverwritetable.urls() {
-                let hudi_table = Table::new(base_url.path()).await?;
+                let hudi_table = Table::new_blocking(base_url.path())?;
                 let commit_timestamps = hudi_table
                     .timeline
                     .completed_commits
@@ -1362,8 +1544,7 @@ mod tests {
 
                 // read records changed from the beginning to the 1st commit
                 let records = hudi_table
-                    .read_incremental_records("19700101000000", 
Some(first_commit))
-                    .await?;
+                    .read_incremental_records_blocking("19700101000000", 
Some(first_commit))?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -1375,8 +1556,7 @@ mod tests {
 
                 // read records changed from the 1st to the 2nd commit
                 let records = hudi_table
-                    .read_incremental_records(first_commit, 
Some(second_commit))
-                    .await?;
+                    .read_incremental_records_blocking(first_commit, 
Some(second_commit))?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -1388,8 +1568,7 @@ mod tests {
 
                 // read records changed from the 2nd to the 3rd commit
                 let records = hudi_table
-                    .read_incremental_records(second_commit, 
Some(third_commit))
-                    .await?;
+                    .read_incremental_records_blocking(second_commit, 
Some(third_commit))?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -1400,9 +1579,7 @@ mod tests {
                 );
 
                 // read records changed from the 1st commit
-                let records = hudi_table
-                    .read_incremental_records(first_commit, None)
-                    .await?;
+                let records = 
hudi_table.read_incremental_records_blocking(first_commit, None)?;
                 let schema = &records[0].schema();
                 let records = concat_batches(schema, &records)?;
                 let sample_data = 
SampleTable::sample_data_order_by_id(&records);
@@ -1413,9 +1590,7 @@ mod tests {
                 );
 
                 // read records changed from the 3rd commit
-                let records = hudi_table
-                    .read_incremental_records(third_commit, None)
-                    .await?;
+                let records = 
hudi_table.read_incremental_records_blocking(third_commit, None)?;
                 assert!(
                     records.is_empty(),
                     "Should return 0 record as it's the latest commit"
diff --git a/crates/core/src/util/mod.rs b/crates/core/src/util/mod.rs
index 21306c9..68bc85e 100644
--- a/crates/core/src/util/mod.rs
+++ b/crates/core/src/util/mod.rs
@@ -17,42 +17,3 @@
  * under the License.
  */
 pub mod arrow;
-
-pub trait StrTupleRef {
-    fn as_strs(&self) -> Vec<(&str, &str, &str)>;
-}
-
-impl StrTupleRef for Vec<(String, String, String)> {
-    fn as_strs(&self) -> Vec<(&str, &str, &str)> {
-        self.iter()
-            .map(|(s1, s2, s3)| (s1.as_str(), s2.as_str(), s3.as_str()))
-            .collect()
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_convert_vec_of_string_to_vec_of_str_slice() {
-        let vec_of_strings = vec![
-            (
-                String::from("date"),
-                String::from("="),
-                String::from("2022-01-02"),
-            ),
-            (
-                String::from("foo"),
-                String::from("bar"),
-                String::from("baz"),
-            ),
-        ];
-        let binding = vec_of_strings.as_strs();
-        let str_slice = &binding[..];
-        assert_eq!(
-            str_slice,
-            [("date", "=", "2022-01-02"), ("foo", "bar", "baz")]
-        );
-    }
-}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 9beb612..f3a0601 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -47,7 +47,6 @@ use hudi_core::config::read::HudiReadConfig::InputPartitions;
 use hudi_core::config::util::empty_options;
 use hudi_core::storage::util::{get_scheme_authority, join_url_segments};
 use hudi_core::table::Table as HudiTable;
-use hudi_core::util::StrTupleRef;
 
 /// Create a `HudiDataSource`.
 /// Used for Datafusion to query Hudi tables
@@ -179,7 +178,7 @@ impl TableProvider for HudiDataSource {
         let pushdown_filters = exprs_to_filters(filters);
         let file_slices = self
             .table
-            .get_file_slices_splits(self.get_input_partitions(), 
&pushdown_filters.as_strs())
+            .get_file_slices_splits(self.get_input_partitions(), 
pushdown_filters)
             .await
             .map_err(|e| Execution(format!("Failed to get file slices from 
Hudi table: {}", e)))?;
         let base_url = self.table.base_url();
diff --git a/demo/table-api-rust/src/main.rs b/demo/table-api-rust/src/main.rs
index 5a5c40c..056c247 100644
--- a/demo/table-api-rust/src/main.rs
+++ b/demo/table-api-rust/src/main.rs
@@ -18,6 +18,7 @@
  */
 
 use arrow::compute::concat_batches;
+use hudi::config::util::empty_filters;
 use hudi::error::Result;
 use hudi::table::builder::TableBuilder as HudiTableBuilder;
 
@@ -28,7 +29,7 @@ async fn main() -> Result<()> {
         "s3://hudi-demo/mor/parquet/v6_complexkeygen_hivestyle",
     ] {
         let hudi_table = HudiTableBuilder::from_base_uri(url).build().await?;
-        let batches = hudi_table.read_snapshot(&[]).await?;
+        let batches = hudi_table.read_snapshot(empty_filters()).await?;
 
         let batch = concat_batches(&batches[0].schema(), &batches)?;
         assert_eq!(
diff --git a/python/src/internal.rs b/python/src/internal.rs
index 86c50e0..aac3658 100644
--- a/python/src/internal.rs
+++ b/python/src/internal.rs
@@ -33,7 +33,6 @@ use hudi::table::builder::TableBuilder;
 use hudi::table::Table;
 use hudi::timeline::instant::Instant;
 use hudi::timeline::Timeline;
-use hudi::util::StrTupleRef;
 use pyo3::exceptions::PyException;
 use pyo3::{create_exception, pyclass, pyfunction, pymethods, PyErr, PyObject, 
PyResult, Python};
 
@@ -86,19 +85,16 @@ impl HudiFileGroupReader {
             .to_pyarrow(py)
     }
     fn read_file_slice(&self, file_slice: &HudiFileSlice, py: Python) -> 
PyResult<PyObject> {
-        let mut file_group = FileGroup::new(
-            file_slice.file_id.clone(),
-            file_slice.partition_path.clone(),
-        );
+        let mut file_group = FileGroup::new_with_base_file_name(
+            &file_slice.base_file_name,
+            &file_slice.partition_path,
+        )
+        .map_err(PythonError::from)?;
+        let log_file_names = &file_slice.log_file_names;
         file_group
-            .add_base_file_from_name(&file_slice.base_file_name)
+            .add_log_files_from_names(log_file_names)
             .map_err(PythonError::from)?;
-        for name in file_slice.log_file_names.iter() {
-            file_group
-                .add_log_file_from_name(name)
-                .map_err(PythonError::from)?;
-        }
-        let (_, inner_file_slice) = file_group
+        let (_, file_slice) = file_group
             .file_slices
             .iter()
             .next()
@@ -109,7 +105,7 @@ impl HudiFileGroupReader {
                 ))
             })
             .map_err(PythonError::from)?;
-        rt().block_on(self.inner.read_file_slice(inner_file_slice))
+        rt().block_on(self.inner.read_file_slice(file_slice))
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
@@ -327,11 +323,12 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
-        let filters = filters.unwrap_or_default();
-
         py.allow_threads(|| {
             let file_slices = rt()
-                .block_on(self.inner.get_file_slices_splits(n, 
&filters.as_strs()))
+                .block_on(
+                    self.inner
+                        .get_file_slices_splits(n, 
filters.unwrap_or_default()),
+                )
                 .map_err(PythonError::from)?;
             Ok(file_slices
                 .iter()
@@ -348,14 +345,13 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Vec<Vec<HudiFileSlice>>> {
-        let filters = filters.unwrap_or_default();
-
         py.allow_threads(|| {
             let file_slices = rt()
-                .block_on(
-                    self.inner
-                        .get_file_slices_splits_as_of(n, timestamp, 
&filters.as_strs()),
-                )
+                .block_on(self.inner.get_file_slices_splits_as_of(
+                    n,
+                    timestamp,
+                    filters.unwrap_or_default(),
+                ))
                 .map_err(PythonError::from)?;
             Ok(file_slices
                 .iter()
@@ -370,11 +366,9 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Vec<HudiFileSlice>> {
-        let filters = filters.unwrap_or_default();
-
         py.allow_threads(|| {
             let file_slices = rt()
-                .block_on(self.inner.get_file_slices(&filters.as_strs()))
+                
.block_on(self.inner.get_file_slices(filters.unwrap_or_default()))
                 .map_err(PythonError::from)?;
             Ok(file_slices.iter().map(HudiFileSlice::from).collect())
         })
@@ -387,13 +381,11 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<Vec<HudiFileSlice>> {
-        let filters = filters.unwrap_or_default();
-
         py.allow_threads(|| {
             let file_slices = rt()
                 .block_on(
                     self.inner
-                        .get_file_slices_as_of(timestamp, &filters.as_strs()),
+                        .get_file_slices_as_of(timestamp, 
filters.unwrap_or_default()),
                 )
                 .map_err(PythonError::from)?;
             Ok(file_slices.iter().map(HudiFileSlice::from).collect())
@@ -436,9 +428,7 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<PyObject> {
-        let filters = filters.unwrap_or_default();
-
-        rt().block_on(self.inner.read_snapshot(&filters.as_strs()))
+        rt().block_on(self.inner.read_snapshot(filters.unwrap_or_default()))
             .map_err(PythonError::from)?
             .to_pyarrow(py)
     }
@@ -450,11 +440,9 @@ impl HudiTable {
         filters: Option<Vec<(String, String, String)>>,
         py: Python,
     ) -> PyResult<PyObject> {
-        let filters = filters.unwrap_or_default();
-
         rt().block_on(
             self.inner
-                .read_snapshot_as_of(timestamp, &filters.as_strs()),
+                .read_snapshot_as_of(timestamp, filters.unwrap_or_default()),
         )
         .map_err(PythonError::from)?
         .to_pyarrow(py)

Reply via email to