This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 2ac8882  refactor: make APIs async (#31)
2ac8882 is described below

commit 2ac888226f40c7c319b971d79201033fd4471bc1
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jun 30 15:07:55 2024 -0500

    refactor: make APIs async (#31)
---
 Cargo.toml                        |   2 +-
 crates/core/src/file_group/mod.rs |  11 ++++
 crates/core/src/table/fs_view.rs  |  33 ++++--------
 crates/core/src/table/mod.rs      | 109 +++++++++++++++++---------------------
 crates/datafusion/Cargo.toml      |   2 +-
 crates/datafusion/src/bin/main.rs |  37 -------------
 crates/datafusion/src/lib.rs      | 105 ++++++++++++++++++++++++++++++------
 python/Cargo.toml                 |   4 ++
 python/src/lib.rs                 |  57 ++++++++++++--------
 9 files changed, 200 insertions(+), 160 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 6d21195..e3c5b99 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -69,5 +69,5 @@ uuid = { version = "1" }
 async-trait = { version = "0.1" }
 async-recursion = { version = "1.1.1" }
 futures = { version = "0.3" }
-tokio = { version = "1" }
+tokio = { version = "1", features = ["rt-multi-thread"]}
 num_cpus = { version = "1" }
diff --git a/crates/core/src/file_group/mod.rs 
b/crates/core/src/file_group/mod.rs
index ec2e171..ece19a4 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,6 +20,7 @@
 use std::collections::BTreeMap;
 use std::fmt;
 use std::fmt::Formatter;
+use std::path::PathBuf;
 
 use crate::storage::file_info::FileInfo;
 use crate::storage::file_stats::FileStats;
@@ -79,6 +80,16 @@ impl FileSlice {
         self.base_file.info.uri.as_str()
     }
 
+    pub fn base_file_relative_path(&self) -> String {
+        let partition_path = self.partition_path.clone().unwrap_or_default();
+        let file_name = &self.base_file.info.name;
+        PathBuf::from(partition_path)
+            .join(file_name)
+            .to_str()
+            .unwrap()
+            .to_string()
+    }
+
     pub fn file_group_id(&self) -> &str {
         &self.base_file.file_group_id
     }
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index c7c20e1..f1976d8 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -87,14 +87,9 @@ impl FileSystemView {
         Ok(file_groups)
     }
 
-    pub fn load_file_groups(&mut self) {
+    pub async fn load_file_groups(&mut self) {
         let fs_view = self.clone();
-        let rt = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
-            .unwrap();
-        let wrapper = async { get_partitions_and_file_groups(&fs_view).await };
-        let result = rt.block_on(wrapper).unwrap();
+        let result = get_partitions_and_file_groups(&fs_view).await.unwrap();
         for (k, v) in result {
             self.partition_to_file_groups.insert(k, v);
         }
@@ -112,18 +107,13 @@ impl FileSystemView {
         file_slices
     }
 
-    pub fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut FileSlice> 
{
-        let rt = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
-            .unwrap();
+    pub async fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut 
FileSlice> {
         let mut file_slices = Vec::new();
         let file_groups = &mut self.partition_to_file_groups.values_mut();
         for fgs in file_groups {
             for fg in fgs {
                 if let Some(file_slice) = fg.get_latest_file_slice_mut() {
-                    let wrapper = async { 
load_file_slice_stats(&self.base_url, file_slice).await };
-                    let _ = rt.block_on(wrapper);
+                    let _ = load_file_slice_stats(&self.base_url, 
file_slice).await;
                     file_slices.push(file_slice)
                 }
             }
@@ -131,14 +121,9 @@ impl FileSystemView {
         file_slices
     }
 
-    pub fn read_file_slice(&self, relative_path: &str) -> Vec<RecordBatch> {
-        let rt = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
-            .unwrap();
+    pub async fn read_file_slice(&self, relative_path: &str) -> 
Vec<RecordBatch> {
         let storage = Storage::new(self.base_url.clone(), HashMap::new());
-        let wrapper = async { 
storage.get_parquet_file_data(relative_path).await };
-        rt.block_on(wrapper)
+        storage.get_parquet_file_data(relative_path).await
     }
 }
 
@@ -217,11 +202,11 @@ mod tests {
         )
     }
 
-    #[test]
-    fn get_latest_file_slices() {
+    #[tokio::test]
+    async fn get_latest_file_slices() {
         let base_url = TestTable::V6Nonpartitioned.url();
         let mut fs_view = FileSystemView::new(base_url);
-        fs_view.load_file_groups();
+        fs_view.load_file_groups().await;
         let file_slices = fs_view.get_latest_file_slices();
         assert_eq!(file_slices.len(), 1);
         let mut fg_ids = Vec::new();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index d26decc..86f1008 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -49,9 +49,15 @@ pub struct Table {
 }
 
 impl Table {
-    pub fn new(base_uri: &str, storage_options: HashMap<String, String>) -> 
Self {
+    pub async fn new(base_uri: &str, storage_options: HashMap<String, String>) 
-> Self {
         let base_url = 
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
-        match Self::load_properties(&base_url, ".hoodie/hoodie.properties", 
&storage_options) {
+        match Self::load_properties(
+            base_url.clone(),
+            ".hoodie/hoodie.properties".to_string(),
+            storage_options.clone(),
+        )
+        .await
+        {
             Ok(props) => Self {
                 base_url,
                 props,
@@ -64,18 +70,13 @@ impl Table {
         }
     }
 
-    fn load_properties(
-        base_url: &Url,
-        props_path: &str,
-        storage_options: &HashMap<String, String>,
+    async fn load_properties(
+        base_url: Url,
+        props_path: String,
+        storage_options: HashMap<String, String>,
     ) -> Result<HashMap<String, String>> {
-        let rt = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
-            .unwrap();
-        let storage = Storage::new(base_url.clone(), storage_options.clone());
-        let get_data = async { storage.get_file_data(props_path).await };
-        let data = rt.block_on(get_data);
+        let storage = Storage::new(base_url, storage_options);
+        let data = storage.get_file_data(props_path.as_str()).await;
         let cursor = std::io::Cursor::new(data);
         let reader = BufReader::new(cursor);
         let lines = reader.lines();
@@ -102,30 +103,16 @@ impl Table {
     }
 
     #[cfg(test)]
-    fn get_timeline(&self) -> Result<Timeline> {
-        let rt = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
-            .unwrap();
-        let init_timeline = async { Timeline::new(self.base_url.clone()).await 
};
-        rt.block_on(init_timeline)
+    async fn get_timeline(&self) -> Result<Timeline> {
+        Timeline::new(self.base_url.clone()).await
     }
 
-    pub fn get_latest_schema(&self) -> SchemaRef {
-        let rt = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
-            .unwrap();
-        let init_timeline = async { Timeline::new(self.base_url.clone()).await 
};
-        let timeline = rt.block_on(init_timeline);
-        match timeline {
+    pub async fn get_latest_schema(&self) -> SchemaRef {
+        let timeline_result = Timeline::new(self.base_url.clone()).await;
+        match timeline_result {
             Ok(timeline) => {
-                let rt = tokio::runtime::Builder::new_current_thread()
-                    .enable_all()
-                    .build()
-                    .unwrap();
-                let get_schema = async { timeline.get_latest_schema().await };
-                match rt.block_on(get_schema) {
+                let schema_result = timeline.get_latest_schema().await;
+                match schema_result {
                     Ok(schema) => SchemaRef::from(schema),
                     Err(e) => panic!("Failed to resolve table schema: {}", e),
                 }
@@ -134,38 +121,39 @@ impl Table {
         }
     }
 
-    pub fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
+    pub async fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
         if self.file_system_view.is_none() {
             let mut new_fs_view = FileSystemView::new(self.base_url.clone());
-            new_fs_view.load_file_groups();
+            new_fs_view.load_file_groups().await;
             self.file_system_view = Some(new_fs_view);
         }
 
         let fs_view = self.file_system_view.as_mut().unwrap();
 
         let mut file_slices = Vec::new();
-        for f in fs_view.get_latest_file_slices_with_stats() {
+        for f in fs_view.get_latest_file_slices_with_stats().await {
             file_slices.push(f.clone());
         }
         Ok(file_slices)
     }
 
-    pub fn read_file_slice(&mut self, relative_path: &str) -> Vec<RecordBatch> 
{
+    pub async fn read_file_slice(&mut self, relative_path: &str) -> 
Vec<RecordBatch> {
         if self.file_system_view.is_none() {
             let mut new_fs_view = FileSystemView::new(self.base_url.clone());
-            new_fs_view.load_file_groups();
+            new_fs_view.load_file_groups().await;
             self.file_system_view = Some(new_fs_view);
         }
 
         let fs_view = self.file_system_view.as_ref().unwrap();
-        fs_view.read_file_slice(relative_path)
+        fs_view.read_file_slice(relative_path).await
     }
 
-    pub fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
+    pub async fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
         let mut file_paths = Vec::new();
-        for f in self.get_latest_file_slices()? {
+        for f in self.get_latest_file_slices().await? {
             file_paths.push(f.base_file_path().to_string());
         }
+        println!("{:?}", file_paths);
         Ok(file_paths)
     }
 }
@@ -267,12 +255,13 @@ mod tests {
     use crate::table::metadata::ProvidesTableMetadata;
     use crate::table::Table;
 
-    #[test]
-    fn hudi_table_get_latest_schema() {
+    #[tokio::test]
+    async fn hudi_table_get_latest_schema() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let hudi_table = Table::new(base_url.path(), HashMap::new());
+        let hudi_table = Table::new(base_url.path(), HashMap::new()).await;
         let fields: Vec<String> = hudi_table
             .get_latest_schema()
+            .await
             .all_fields()
             .into_iter()
             .map(|f| f.name().to_string())
@@ -318,25 +307,27 @@ mod tests {
         );
     }
 
-    #[test]
-    fn hudi_table_read_file_slice() {
+    #[tokio::test]
+    async fn hudi_table_read_file_slice() {
         let base_url = TestTable::V6Nonpartitioned.url();
-        let mut hudi_table = Table::new(base_url.path(), HashMap::new());
-        let batches = hudi_table.read_file_slice(
-            
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
-        );
+        let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+        let batches = hudi_table
+            .read_file_slice(
+                
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
+            )
+            .await;
         assert_eq!(batches.len(), 1);
         assert_eq!(batches.first().unwrap().num_rows(), 4);
         assert_eq!(batches.first().unwrap().num_columns(), 21);
     }
 
-    #[test]
-    fn hudi_table_get_latest_file_paths() {
+    #[tokio::test]
+    async fn hudi_table_get_latest_file_paths() {
         let base_url = TestTable::V6ComplexkeygenHivestyle.url();
-        let mut hudi_table = Table::new(base_url.path(), HashMap::new());
-        assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2);
+        let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+        assert_eq!(hudi_table.get_timeline().await.unwrap().instants.len(), 2);
         let actual: HashSet<String> =
-            HashSet::from_iter(hudi_table.get_latest_file_paths().unwrap());
+            
HashSet::from_iter(hudi_table.get_latest_file_paths().await.unwrap());
         let expected: HashSet<String> = HashSet::from_iter(vec![
             
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
             
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
@@ -347,11 +338,11 @@ mod tests {
         assert_eq!(actual, expected);
     }
 
-    #[test]
-    fn hudi_table_get_table_metadata() {
+    #[tokio::test]
+    async fn hudi_table_get_table_metadata() {
         let base_path =
             
canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap();
-        let table = Table::new(base_path.to_str().unwrap(), HashMap::new());
+        let table = Table::new(base_path.to_str().unwrap(), 
HashMap::new()).await;
         assert_eq!(table.base_file_format(), Parquet);
         assert_eq!(table.checksum(), 3761586722);
         assert_eq!(table.database_name(), "default");
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 4f250ff..84c17a5 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -56,7 +56,7 @@ serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
 
 # async
-tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros"] }
+tokio = { workspace = true }
 
 # "stdlib"
 bytes = { workspace = true }
diff --git a/crates/datafusion/src/bin/main.rs 
b/crates/datafusion/src/bin/main.rs
deleted file mode 100644
index fc0e9da..0000000
--- a/crates/datafusion/src/bin/main.rs
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::sync::Arc;
-
-use datafusion::error::Result;
-use datafusion::prelude::{DataFrame, SessionContext};
-
-use hudi_datafusion::HudiDataSource;
-
-#[tokio::main]
-async fn main() -> Result<()> {
-    let ctx = SessionContext::new();
-    let hudi = HudiDataSource::new("/tmp/trips_table");
-    ctx.register_table("trips_table", Arc::new(hudi))?;
-    let df: DataFrame = ctx
-        .sql("SELECT * from trips_table where fare > 20.0")
-        .await?;
-    df.show().await?;
-    Ok(())
-}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 7025a8c..f064961 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -17,13 +17,13 @@
  * under the License.
  */
 
-use arrow_array::RecordBatch;
 use std::any::Any;
 use std::collections::HashMap;
 use std::fmt::Debug;
-use std::fs::File;
 use std::sync::Arc;
+use std::thread;
 
+use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
 use async_trait::async_trait;
 use datafusion::datasource::TableProvider;
@@ -34,7 +34,6 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType, 
ExecutionPlan};
 use datafusion_common::{project_schema, DataFusionError};
 use datafusion_expr::{Expr, TableType};
 use datafusion_physical_expr::PhysicalSortExpr;
-use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
 
 use hudi_core::HudiTable;
 
@@ -44,11 +43,12 @@ pub struct HudiDataSource {
 }
 
 impl HudiDataSource {
-    pub fn new(base_path: &str) -> Self {
+    pub async fn new(base_uri: &str, storage_options: HashMap<String, String>) 
-> Self {
         Self {
-            table: HudiTable::new(base_path, HashMap::new()),
+            table: HudiTable::new(base_uri, storage_options).await,
         }
     }
+
     pub(crate) async fn create_physical_plan(
         &self,
         projections: Option<&Vec<usize>>,
@@ -57,17 +57,14 @@ impl HudiDataSource {
         Ok(Arc::new(HudiExec::new(projections, schema, self.clone())))
     }
 
-    fn get_record_batches(&mut self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
-        match self.table.get_latest_file_paths() {
-            Ok(file_paths) => {
+    async fn get_record_batches(&mut self) -> 
datafusion_common::Result<Vec<RecordBatch>> {
+        match self.table.get_latest_file_slices().await {
+            Ok(file_slices) => {
                 let mut record_batches = Vec::new();
-                for f in file_paths {
-                    let file = File::open(f)?;
-                    let builder = 
ParquetRecordBatchReaderBuilder::try_new(file)?;
-                    let mut reader = builder.build()?;
-                    if let Ok(Some(result)) = reader.next().transpose() {
-                        record_batches.push(result)
-                    }
+                for f in file_slices {
+                    let relative_path = f.base_file_relative_path();
+                    let records = 
self.table.read_file_slice(&relative_path).await;
+                    record_batches.extend(records)
                 }
                 Ok(record_batches)
             }
@@ -85,7 +82,12 @@ impl TableProvider for HudiDataSource {
     }
 
     fn schema(&self) -> SchemaRef {
-        self.table.get_latest_schema()
+        let table = self.table.clone();
+        let handle = thread::spawn(move || {
+            let rt = tokio::runtime::Runtime::new().unwrap();
+            rt.block_on(async { table.get_latest_schema().await })
+        });
+        handle.join().unwrap()
     }
 
     fn table_type(&self) -> TableType {
@@ -163,7 +165,76 @@ impl ExecutionPlan for HudiExec {
         _context: Arc<TaskContext>,
     ) -> datafusion_common::Result<SendableRecordBatchStream> {
         let mut data_source = self.data_source.clone();
-        let data = data_source.get_record_batches()?;
+        let handle = thread::spawn(move || {
+            let rt = tokio::runtime::Runtime::new().unwrap();
+            rt.block_on(data_source.get_record_batches()).unwrap()
+        });
+        let data = handle.join().unwrap();
         Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    use arrow_array::{Array, Int32Array, StringArray};
+    use datafusion::dataframe::DataFrame;
+    use datafusion::prelude::{SessionConfig, SessionContext};
+    use datafusion_common::ScalarValue;
+
+    use hudi_tests::TestTable;
+
+    use crate::HudiDataSource;
+
+    #[tokio::test]
+    async fn datafusion_read_hudi_table() {
+        let config = SessionConfig::new().set(
+            "datafusion.sql_parser.enable_ident_normalization",
+            ScalarValue::from(false),
+        );
+        let ctx = SessionContext::new_with_config(config);
+        let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+        let hudi = HudiDataSource::new(base_url.path(), HashMap::new()).await;
+        ctx.register_table("hudi_table_complexkeygen", Arc::new(hudi))
+            .unwrap();
+        let df: DataFrame = ctx
+            .sql("SELECT * from hudi_table_complexkeygen where 
structField.field2 > 30 order by name")
+            .await.unwrap();
+        let records = df
+            .collect()
+            .await
+            .unwrap()
+            .to_vec()
+            .first()
+            .unwrap()
+            .to_owned();
+        let files: Vec<String> = records
+            .column_by_name("_hoodie_file_name")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap()
+            .iter()
+            .map(|s| s.unwrap_or_default().to_string())
+            .collect();
+        assert_eq!(
+            files,
+            vec![
+                
"bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
+                
"4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet"
+            ]
+        );
+        let ids: Vec<i32> = records
+            .column_by_name("id")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap()
+            .iter()
+            .map(|i| i.unwrap_or_default())
+            .collect();
+        assert_eq!(ids, vec![2, 4])
+    }
+}
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 3ce4986..3db2cc1 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -42,6 +42,10 @@ arrow-row = { workspace = true }
 arrow-schema = { workspace = true }
 arrow-select = { workspace = true }
 
+# runtime / async
+futures = { workspace = true }
+tokio = { workspace = true }
+
 [dependencies.pyo3]
 version = "0.20.3"
 features = ["extension-module", "abi3", "abi3-py38"]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 07c81b0..89851bf 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -19,13 +19,16 @@
 
 use std::collections::HashMap;
 use std::path::PathBuf;
+use std::sync::OnceLock;
 
 use arrow::pyarrow::ToPyArrow;
 use pyo3::prelude::*;
+use tokio::runtime::Runtime;
 
 use hudi::file_group::FileSlice;
 use hudi::HudiTable;
 
+#[cfg(not(tarpaulin))]
 #[pyclass]
 struct HudiFileSlice {
     #[pyo3(get)]
@@ -44,6 +47,7 @@ struct HudiFileSlice {
     num_records: i64,
 }
 
+#[cfg(not(tarpaulin))]
 impl HudiFileSlice {
     pub fn from_file_slice(f: FileSlice) -> Self {
         let partition_path = 
f.partition_path.clone().unwrap_or("".to_string());
@@ -62,53 +66,58 @@ impl HudiFileSlice {
     }
 }
 
+#[cfg(not(tarpaulin))]
 #[pyclass]
 struct BindingHudiTable {
     _table: HudiTable,
 }
 
+#[cfg(not(tarpaulin))]
 #[pymethods]
 impl BindingHudiTable {
     #[new]
     #[pyo3(signature = (table_uri, storage_options = None))]
-    fn new(
-        py: Python,
-        table_uri: &str,
-        storage_options: Option<HashMap<String, String>>,
-    ) -> PyResult<Self> {
-        py.allow_threads(|| {
-            Ok(BindingHudiTable {
-                _table: HudiTable::new(table_uri, 
storage_options.unwrap_or_default()),
-            })
-        })
+    fn new(table_uri: &str, storage_options: Option<HashMap<String, String>>) 
-> PyResult<Self> {
+        let _table = rt().block_on(HudiTable::new(
+            table_uri,
+            storage_options.unwrap_or_default(),
+        ));
+        Ok(BindingHudiTable { _table })
     }
 
     pub fn schema(&self, py: Python) -> PyResult<PyObject> {
-        self._table.get_latest_schema().to_pyarrow(py)
+        rt().block_on(self._table.get_latest_schema())
+            .to_pyarrow(py)
     }
 
-    pub fn get_latest_file_slices(&mut self) -> PyResult<Vec<HudiFileSlice>> {
-        match self._table.get_latest_file_slices() {
-            Ok(file_slices) => Ok(file_slices
-                .into_iter()
-                .map(HudiFileSlice::from_file_slice)
-                .collect()),
-            Err(_e) => {
-                panic!("Failed to retrieve the latest file slices.")
+    pub fn get_latest_file_slices(&mut self, py: Python) -> 
PyResult<Vec<HudiFileSlice>> {
+        py.allow_threads(|| {
+            let res = rt().block_on(self._table.get_latest_file_slices());
+            match res {
+                Ok(file_slices) => Ok(file_slices
+                    .into_iter()
+                    .map(HudiFileSlice::from_file_slice)
+                    .collect()),
+                Err(_e) => {
+                    panic!("Failed to retrieve the latest file slices.")
+                }
             }
-        }
+        })
     }
 
     pub fn read_file_slice(&mut self, relative_path: &str, py: Python) -> 
PyResult<PyObject> {
-        self._table.read_file_slice(relative_path).to_pyarrow(py)
+        rt().block_on(self._table.read_file_slice(relative_path))
+            .to_pyarrow(py)
     }
 }
 
+#[cfg(not(tarpaulin))]
 #[pyfunction]
 fn rust_core_version() -> &'static str {
     hudi::crate_version()
 }
 
+#[cfg(not(tarpaulin))]
 #[pymodule]
 fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
     m.add("__version__", env!("CARGO_PKG_VERSION"))?;
@@ -118,3 +127,9 @@ fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
     m.add_class::<BindingHudiTable>()?;
     Ok(())
 }
+
+#[cfg(not(tarpaulin))]
+pub fn rt() -> &'static Runtime {
+    static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
+    TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio 
runtime."))
+}

Reply via email to