This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hudi-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 2ac8882 refactor: make APIs async (#31)
2ac8882 is described below
commit 2ac888226f40c7c319b971d79201033fd4471bc1
Author: Shiyan Xu <[email protected]>
AuthorDate: Sun Jun 30 15:07:55 2024 -0500
refactor: make APIs async (#31)
---
Cargo.toml | 2 +-
crates/core/src/file_group/mod.rs | 11 ++++
crates/core/src/table/fs_view.rs | 33 ++++--------
crates/core/src/table/mod.rs | 109 +++++++++++++++++---------------------
crates/datafusion/Cargo.toml | 2 +-
crates/datafusion/src/bin/main.rs | 37 -------------
crates/datafusion/src/lib.rs | 105 ++++++++++++++++++++++++++++++------
python/Cargo.toml | 4 ++
python/src/lib.rs | 57 ++++++++++++--------
9 files changed, 200 insertions(+), 160 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 6d21195..e3c5b99 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -69,5 +69,5 @@ uuid = { version = "1" }
async-trait = { version = "0.1" }
async-recursion = { version = "1.1.1" }
futures = { version = "0.3" }
-tokio = { version = "1" }
+tokio = { version = "1", features = ["rt-multi-thread"]}
num_cpus = { version = "1" }
diff --git a/crates/core/src/file_group/mod.rs
b/crates/core/src/file_group/mod.rs
index ec2e171..ece19a4 100644
--- a/crates/core/src/file_group/mod.rs
+++ b/crates/core/src/file_group/mod.rs
@@ -20,6 +20,7 @@
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Formatter;
+use std::path::PathBuf;
use crate::storage::file_info::FileInfo;
use crate::storage::file_stats::FileStats;
@@ -79,6 +80,16 @@ impl FileSlice {
self.base_file.info.uri.as_str()
}
+ pub fn base_file_relative_path(&self) -> String {
+ let partition_path = self.partition_path.clone().unwrap_or_default();
+ let file_name = &self.base_file.info.name;
+ PathBuf::from(partition_path)
+ .join(file_name)
+ .to_str()
+ .unwrap()
+ .to_string()
+ }
+
pub fn file_group_id(&self) -> &str {
&self.base_file.file_group_id
}
diff --git a/crates/core/src/table/fs_view.rs b/crates/core/src/table/fs_view.rs
index c7c20e1..f1976d8 100644
--- a/crates/core/src/table/fs_view.rs
+++ b/crates/core/src/table/fs_view.rs
@@ -87,14 +87,9 @@ impl FileSystemView {
Ok(file_groups)
}
- pub fn load_file_groups(&mut self) {
+ pub async fn load_file_groups(&mut self) {
let fs_view = self.clone();
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- let wrapper = async { get_partitions_and_file_groups(&fs_view).await };
- let result = rt.block_on(wrapper).unwrap();
+ let result = get_partitions_and_file_groups(&fs_view).await.unwrap();
for (k, v) in result {
self.partition_to_file_groups.insert(k, v);
}
@@ -112,18 +107,13 @@ impl FileSystemView {
file_slices
}
- pub fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut FileSlice>
{
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
+ pub async fn get_latest_file_slices_with_stats(&mut self) -> Vec<&mut
FileSlice> {
let mut file_slices = Vec::new();
let file_groups = &mut self.partition_to_file_groups.values_mut();
for fgs in file_groups {
for fg in fgs {
if let Some(file_slice) = fg.get_latest_file_slice_mut() {
- let wrapper = async {
load_file_slice_stats(&self.base_url, file_slice).await };
- let _ = rt.block_on(wrapper);
+ let _ = load_file_slice_stats(&self.base_url,
file_slice).await;
file_slices.push(file_slice)
}
}
@@ -131,14 +121,9 @@ impl FileSystemView {
file_slices
}
- pub fn read_file_slice(&self, relative_path: &str) -> Vec<RecordBatch> {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
+ pub async fn read_file_slice(&self, relative_path: &str) ->
Vec<RecordBatch> {
let storage = Storage::new(self.base_url.clone(), HashMap::new());
- let wrapper = async {
storage.get_parquet_file_data(relative_path).await };
- rt.block_on(wrapper)
+ storage.get_parquet_file_data(relative_path).await
}
}
@@ -217,11 +202,11 @@ mod tests {
)
}
- #[test]
- fn get_latest_file_slices() {
+ #[tokio::test]
+ async fn get_latest_file_slices() {
let base_url = TestTable::V6Nonpartitioned.url();
let mut fs_view = FileSystemView::new(base_url);
- fs_view.load_file_groups();
+ fs_view.load_file_groups().await;
let file_slices = fs_view.get_latest_file_slices();
assert_eq!(file_slices.len(), 1);
let mut fg_ids = Vec::new();
diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs
index d26decc..86f1008 100644
--- a/crates/core/src/table/mod.rs
+++ b/crates/core/src/table/mod.rs
@@ -49,9 +49,15 @@ pub struct Table {
}
impl Table {
- pub fn new(base_uri: &str, storage_options: HashMap<String, String>) ->
Self {
+ pub async fn new(base_uri: &str, storage_options: HashMap<String, String>)
-> Self {
let base_url =
Url::from_file_path(PathBuf::from(base_uri).as_path()).unwrap();
- match Self::load_properties(&base_url, ".hoodie/hoodie.properties",
&storage_options) {
+ match Self::load_properties(
+ base_url.clone(),
+ ".hoodie/hoodie.properties".to_string(),
+ storage_options.clone(),
+ )
+ .await
+ {
Ok(props) => Self {
base_url,
props,
@@ -64,18 +70,13 @@ impl Table {
}
}
- fn load_properties(
- base_url: &Url,
- props_path: &str,
- storage_options: &HashMap<String, String>,
+ async fn load_properties(
+ base_url: Url,
+ props_path: String,
+ storage_options: HashMap<String, String>,
) -> Result<HashMap<String, String>> {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- let storage = Storage::new(base_url.clone(), storage_options.clone());
- let get_data = async { storage.get_file_data(props_path).await };
- let data = rt.block_on(get_data);
+ let storage = Storage::new(base_url, storage_options);
+ let data = storage.get_file_data(props_path.as_str()).await;
let cursor = std::io::Cursor::new(data);
let reader = BufReader::new(cursor);
let lines = reader.lines();
@@ -102,30 +103,16 @@ impl Table {
}
#[cfg(test)]
- fn get_timeline(&self) -> Result<Timeline> {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- let init_timeline = async { Timeline::new(self.base_url.clone()).await
};
- rt.block_on(init_timeline)
+ async fn get_timeline(&self) -> Result<Timeline> {
+ Timeline::new(self.base_url.clone()).await
}
- pub fn get_latest_schema(&self) -> SchemaRef {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- let init_timeline = async { Timeline::new(self.base_url.clone()).await
};
- let timeline = rt.block_on(init_timeline);
- match timeline {
+ pub async fn get_latest_schema(&self) -> SchemaRef {
+ let timeline_result = Timeline::new(self.base_url.clone()).await;
+ match timeline_result {
Ok(timeline) => {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()
- .unwrap();
- let get_schema = async { timeline.get_latest_schema().await };
- match rt.block_on(get_schema) {
+ let schema_result = timeline.get_latest_schema().await;
+ match schema_result {
Ok(schema) => SchemaRef::from(schema),
Err(e) => panic!("Failed to resolve table schema: {}", e),
}
@@ -134,38 +121,39 @@ impl Table {
}
}
- pub fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
+ pub async fn get_latest_file_slices(&mut self) -> Result<Vec<FileSlice>> {
if self.file_system_view.is_none() {
let mut new_fs_view = FileSystemView::new(self.base_url.clone());
- new_fs_view.load_file_groups();
+ new_fs_view.load_file_groups().await;
self.file_system_view = Some(new_fs_view);
}
let fs_view = self.file_system_view.as_mut().unwrap();
let mut file_slices = Vec::new();
- for f in fs_view.get_latest_file_slices_with_stats() {
+ for f in fs_view.get_latest_file_slices_with_stats().await {
file_slices.push(f.clone());
}
Ok(file_slices)
}
- pub fn read_file_slice(&mut self, relative_path: &str) -> Vec<RecordBatch>
{
+ pub async fn read_file_slice(&mut self, relative_path: &str) ->
Vec<RecordBatch> {
if self.file_system_view.is_none() {
let mut new_fs_view = FileSystemView::new(self.base_url.clone());
- new_fs_view.load_file_groups();
+ new_fs_view.load_file_groups().await;
self.file_system_view = Some(new_fs_view);
}
let fs_view = self.file_system_view.as_ref().unwrap();
- fs_view.read_file_slice(relative_path)
+ fs_view.read_file_slice(relative_path).await
}
- pub fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
+ pub async fn get_latest_file_paths(&mut self) -> Result<Vec<String>> {
let mut file_paths = Vec::new();
- for f in self.get_latest_file_slices()? {
+ for f in self.get_latest_file_slices().await? {
file_paths.push(f.base_file_path().to_string());
}
+ println!("{:?}", file_paths);
Ok(file_paths)
}
}
@@ -267,12 +255,13 @@ mod tests {
use crate::table::metadata::ProvidesTableMetadata;
use crate::table::Table;
- #[test]
- fn hudi_table_get_latest_schema() {
+ #[tokio::test]
+ async fn hudi_table_get_latest_schema() {
let base_url = TestTable::V6Nonpartitioned.url();
- let hudi_table = Table::new(base_url.path(), HashMap::new());
+ let hudi_table = Table::new(base_url.path(), HashMap::new()).await;
let fields: Vec<String> = hudi_table
.get_latest_schema()
+ .await
.all_fields()
.into_iter()
.map(|f| f.name().to_string())
@@ -318,25 +307,27 @@ mod tests {
);
}
- #[test]
- fn hudi_table_read_file_slice() {
+ #[tokio::test]
+ async fn hudi_table_read_file_slice() {
let base_url = TestTable::V6Nonpartitioned.url();
- let mut hudi_table = Table::new(base_url.path(), HashMap::new());
- let batches = hudi_table.read_file_slice(
-
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
- );
+ let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+ let batches = hudi_table
+ .read_file_slice(
+
"a079bdb3-731c-4894-b855-abfcd6921007-0_0-203-274_20240418173551906.parquet",
+ )
+ .await;
assert_eq!(batches.len(), 1);
assert_eq!(batches.first().unwrap().num_rows(), 4);
assert_eq!(batches.first().unwrap().num_columns(), 21);
}
- #[test]
- fn hudi_table_get_latest_file_paths() {
+ #[tokio::test]
+ async fn hudi_table_get_latest_file_paths() {
let base_url = TestTable::V6ComplexkeygenHivestyle.url();
- let mut hudi_table = Table::new(base_url.path(), HashMap::new());
- assert_eq!(hudi_table.get_timeline().unwrap().instants.len(), 2);
+ let mut hudi_table = Table::new(base_url.path(), HashMap::new()).await;
+ assert_eq!(hudi_table.get_timeline().await.unwrap().instants.len(), 2);
let actual: HashSet<String> =
- HashSet::from_iter(hudi_table.get_latest_file_paths().unwrap());
+
HashSet::from_iter(hudi_table.get_latest_file_paths().await.unwrap());
let expected: HashSet<String> = HashSet::from_iter(vec![
"byteField=10/shortField=300/a22e8257-e249-45e9-ba46-115bc85adcba-0_0-161-223_20240418173235694.parquet",
"byteField=20/shortField=100/bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
@@ -347,11 +338,11 @@ mod tests {
assert_eq!(actual, expected);
}
- #[test]
- fn hudi_table_get_table_metadata() {
+ #[tokio::test]
+ async fn hudi_table_get_table_metadata() {
let base_path =
canonicalize(Path::new("fixtures/table_metadata/sample_table_properties")).unwrap();
- let table = Table::new(base_path.to_str().unwrap(), HashMap::new());
+ let table = Table::new(base_path.to_str().unwrap(),
HashMap::new()).await;
assert_eq!(table.base_file_format(), Parquet);
assert_eq!(table.checksum(), 3761586722);
assert_eq!(table.database_name(), "default");
diff --git a/crates/datafusion/Cargo.toml b/crates/datafusion/Cargo.toml
index 4f250ff..84c17a5 100644
--- a/crates/datafusion/Cargo.toml
+++ b/crates/datafusion/Cargo.toml
@@ -56,7 +56,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
# async
-tokio = { version = "1.37.0", features = ["rt", "rt-multi-thread", "macros"] }
+tokio = { workspace = true }
# "stdlib"
bytes = { workspace = true }
diff --git a/crates/datafusion/src/bin/main.rs
b/crates/datafusion/src/bin/main.rs
deleted file mode 100644
index fc0e9da..0000000
--- a/crates/datafusion/src/bin/main.rs
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-use std::sync::Arc;
-
-use datafusion::error::Result;
-use datafusion::prelude::{DataFrame, SessionContext};
-
-use hudi_datafusion::HudiDataSource;
-
-#[tokio::main]
-async fn main() -> Result<()> {
- let ctx = SessionContext::new();
- let hudi = HudiDataSource::new("/tmp/trips_table");
- ctx.register_table("trips_table", Arc::new(hudi))?;
- let df: DataFrame = ctx
- .sql("SELECT * from trips_table where fare > 20.0")
- .await?;
- df.show().await?;
- Ok(())
-}
diff --git a/crates/datafusion/src/lib.rs b/crates/datafusion/src/lib.rs
index 7025a8c..f064961 100644
--- a/crates/datafusion/src/lib.rs
+++ b/crates/datafusion/src/lib.rs
@@ -17,13 +17,13 @@
* under the License.
*/
-use arrow_array::RecordBatch;
use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
-use std::fs::File;
use std::sync::Arc;
+use std::thread;
+use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::datasource::TableProvider;
@@ -34,7 +34,6 @@ use datafusion::physical_plan::{DisplayAs, DisplayFormatType,
ExecutionPlan};
use datafusion_common::{project_schema, DataFusionError};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
-use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use hudi_core::HudiTable;
@@ -44,11 +43,12 @@ pub struct HudiDataSource {
}
impl HudiDataSource {
- pub fn new(base_path: &str) -> Self {
+ pub async fn new(base_uri: &str, storage_options: HashMap<String, String>)
-> Self {
Self {
- table: HudiTable::new(base_path, HashMap::new()),
+ table: HudiTable::new(base_uri, storage_options).await,
}
}
+
pub(crate) async fn create_physical_plan(
&self,
projections: Option<&Vec<usize>>,
@@ -57,17 +57,14 @@ impl HudiDataSource {
Ok(Arc::new(HudiExec::new(projections, schema, self.clone())))
}
- fn get_record_batches(&mut self) ->
datafusion_common::Result<Vec<RecordBatch>> {
- match self.table.get_latest_file_paths() {
- Ok(file_paths) => {
+ async fn get_record_batches(&mut self) ->
datafusion_common::Result<Vec<RecordBatch>> {
+ match self.table.get_latest_file_slices().await {
+ Ok(file_slices) => {
let mut record_batches = Vec::new();
- for f in file_paths {
- let file = File::open(f)?;
- let builder =
ParquetRecordBatchReaderBuilder::try_new(file)?;
- let mut reader = builder.build()?;
- if let Ok(Some(result)) = reader.next().transpose() {
- record_batches.push(result)
- }
+ for f in file_slices {
+ let relative_path = f.base_file_relative_path();
+ let records =
self.table.read_file_slice(&relative_path).await;
+ record_batches.extend(records)
}
Ok(record_batches)
}
@@ -85,7 +82,12 @@ impl TableProvider for HudiDataSource {
}
fn schema(&self) -> SchemaRef {
- self.table.get_latest_schema()
+ let table = self.table.clone();
+ let handle = thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(async { table.get_latest_schema().await })
+ });
+ handle.join().unwrap()
}
fn table_type(&self) -> TableType {
@@ -163,7 +165,76 @@ impl ExecutionPlan for HudiExec {
_context: Arc<TaskContext>,
) -> datafusion_common::Result<SendableRecordBatchStream> {
let mut data_source = self.data_source.clone();
- let data = data_source.get_record_batches()?;
+ let handle = thread::spawn(move || {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ rt.block_on(data_source.get_record_batches()).unwrap()
+ });
+ let data = handle.join().unwrap();
Ok(Box::pin(MemoryStream::try_new(data, self.schema(), None)?))
}
}
+
+#[cfg(test)]
+mod tests {
+ use std::collections::HashMap;
+ use std::sync::Arc;
+
+ use arrow_array::{Array, Int32Array, StringArray};
+ use datafusion::dataframe::DataFrame;
+ use datafusion::prelude::{SessionConfig, SessionContext};
+ use datafusion_common::ScalarValue;
+
+ use hudi_tests::TestTable;
+
+ use crate::HudiDataSource;
+
+ #[tokio::test]
+ async fn datafusion_read_hudi_table() {
+ let config = SessionConfig::new().set(
+ "datafusion.sql_parser.enable_ident_normalization",
+ ScalarValue::from(false),
+ );
+ let ctx = SessionContext::new_with_config(config);
+ let base_url = TestTable::V6ComplexkeygenHivestyle.url();
+ let hudi = HudiDataSource::new(base_url.path(), HashMap::new()).await;
+ ctx.register_table("hudi_table_complexkeygen", Arc::new(hudi))
+ .unwrap();
+ let df: DataFrame = ctx
+ .sql("SELECT * from hudi_table_complexkeygen where
structField.field2 > 30 order by name")
+ .await.unwrap();
+ let records = df
+ .collect()
+ .await
+ .unwrap()
+ .to_vec()
+ .first()
+ .unwrap()
+ .to_owned();
+ let files: Vec<String> = records
+ .column_by_name("_hoodie_file_name")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .unwrap()
+ .iter()
+ .map(|s| s.unwrap_or_default().to_string())
+ .collect();
+ assert_eq!(
+ files,
+ vec![
+
"bb7c3a45-387f-490d-aab2-981c3f1a8ada-0_0-140-198_20240418173213674.parquet",
+
"4668e35e-bff8-4be9-9ff2-e7fb17ecb1a7-0_1-161-224_20240418173235694.parquet"
+ ]
+ );
+ let ids: Vec<i32> = records
+ .column_by_name("id")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .iter()
+ .map(|i| i.unwrap_or_default())
+ .collect();
+ assert_eq!(ids, vec![2, 4])
+ }
+}
diff --git a/python/Cargo.toml b/python/Cargo.toml
index 3ce4986..3db2cc1 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -42,6 +42,10 @@ arrow-row = { workspace = true }
arrow-schema = { workspace = true }
arrow-select = { workspace = true }
+# runtime / async
+futures = { workspace = true }
+tokio = { workspace = true }
+
[dependencies.pyo3]
version = "0.20.3"
features = ["extension-module", "abi3", "abi3-py38"]
diff --git a/python/src/lib.rs b/python/src/lib.rs
index 07c81b0..89851bf 100644
--- a/python/src/lib.rs
+++ b/python/src/lib.rs
@@ -19,13 +19,16 @@
use std::collections::HashMap;
use std::path::PathBuf;
+use std::sync::OnceLock;
use arrow::pyarrow::ToPyArrow;
use pyo3::prelude::*;
+use tokio::runtime::Runtime;
use hudi::file_group::FileSlice;
use hudi::HudiTable;
+#[cfg(not(tarpaulin))]
#[pyclass]
struct HudiFileSlice {
#[pyo3(get)]
@@ -44,6 +47,7 @@ struct HudiFileSlice {
num_records: i64,
}
+#[cfg(not(tarpaulin))]
impl HudiFileSlice {
pub fn from_file_slice(f: FileSlice) -> Self {
let partition_path =
f.partition_path.clone().unwrap_or("".to_string());
@@ -62,53 +66,58 @@ impl HudiFileSlice {
}
}
+#[cfg(not(tarpaulin))]
#[pyclass]
struct BindingHudiTable {
_table: HudiTable,
}
+#[cfg(not(tarpaulin))]
#[pymethods]
impl BindingHudiTable {
#[new]
#[pyo3(signature = (table_uri, storage_options = None))]
- fn new(
- py: Python,
- table_uri: &str,
- storage_options: Option<HashMap<String, String>>,
- ) -> PyResult<Self> {
- py.allow_threads(|| {
- Ok(BindingHudiTable {
- _table: HudiTable::new(table_uri,
storage_options.unwrap_or_default()),
- })
- })
+ fn new(table_uri: &str, storage_options: Option<HashMap<String, String>>)
-> PyResult<Self> {
+ let _table = rt().block_on(HudiTable::new(
+ table_uri,
+ storage_options.unwrap_or_default(),
+ ));
+ Ok(BindingHudiTable { _table })
}
pub fn schema(&self, py: Python) -> PyResult<PyObject> {
- self._table.get_latest_schema().to_pyarrow(py)
+ rt().block_on(self._table.get_latest_schema())
+ .to_pyarrow(py)
}
- pub fn get_latest_file_slices(&mut self) -> PyResult<Vec<HudiFileSlice>> {
- match self._table.get_latest_file_slices() {
- Ok(file_slices) => Ok(file_slices
- .into_iter()
- .map(HudiFileSlice::from_file_slice)
- .collect()),
- Err(_e) => {
- panic!("Failed to retrieve the latest file slices.")
+ pub fn get_latest_file_slices(&mut self, py: Python) ->
PyResult<Vec<HudiFileSlice>> {
+ py.allow_threads(|| {
+ let res = rt().block_on(self._table.get_latest_file_slices());
+ match res {
+ Ok(file_slices) => Ok(file_slices
+ .into_iter()
+ .map(HudiFileSlice::from_file_slice)
+ .collect()),
+ Err(_e) => {
+ panic!("Failed to retrieve the latest file slices.")
+ }
}
- }
+ })
}
pub fn read_file_slice(&mut self, relative_path: &str, py: Python) ->
PyResult<PyObject> {
- self._table.read_file_slice(relative_path).to_pyarrow(py)
+ rt().block_on(self._table.read_file_slice(relative_path))
+ .to_pyarrow(py)
}
}
+#[cfg(not(tarpaulin))]
#[pyfunction]
fn rust_core_version() -> &'static str {
hudi::crate_version()
}
+#[cfg(not(tarpaulin))]
#[pymodule]
fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
@@ -118,3 +127,9 @@ fn _internal(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<BindingHudiTable>()?;
Ok(())
}
+
+#[cfg(not(tarpaulin))]
+pub fn rt() -> &'static Runtime {
+ static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
+ TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio
runtime."))
+}