This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0e46d3a fix(datafusion): wrap system table scan IO in
await_with_runtime to support FFI (#295)
0e46d3a is described below
commit 0e46d3a14369b17836fb468b14ce874b30e9da9a
Author: Jiajia Li <[email protected]>
AuthorDate: Sat May 9 09:16:08 2026 +0800
fix(datafusion): wrap system table scan IO in await_with_runtime to support
FFI (#295)
---
.../datafusion/src/system_tables/branches.rs | 43 ++++++++++------------
.../datafusion/src/system_tables/manifests.rs | 8 ++--
.../datafusion/src/system_tables/schemas.rs | 9 +++--
.../datafusion/src/system_tables/snapshots.rs | 4 +-
.../datafusion/src/system_tables/tags.rs | 4 +-
5 files changed, 36 insertions(+), 32 deletions(-)
diff --git a/crates/integrations/datafusion/src/system_tables/branches.rs
b/crates/integrations/datafusion/src/system_tables/branches.rs
index 581fb97..468a9ad 100644
--- a/crates/integrations/datafusion/src/system_tables/branches.rs
+++ b/crates/integrations/datafusion/src/system_tables/branches.rs
@@ -79,31 +79,11 @@ impl TableProvider for BranchesTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- let bm = BranchManager::new(
- self.table.file_io().clone(),
- self.table.location().to_string(),
- );
- let branch_names = bm.list_all().await.map_err(to_datafusion_error)?;
-
- let n = branch_names.len();
- let mut names: Vec<String> = Vec::with_capacity(n);
- let mut create_times = Vec::with_capacity(n);
-
- for name in branch_names {
- let branch_path = bm.branch_path(&name);
- let status = self
- .table
- .file_io()
- .get_status(&branch_path)
+ let table = self.table.clone();
+ let (names, create_times) =
+ crate::runtime::await_with_runtime(async move {
collect_branches(&table).await })
.await
.map_err(to_datafusion_error)?;
- let ts_millis = status
- .last_modified
- .map(|dt| dt.timestamp_millis())
- .unwrap_or(0);
- names.push(name);
- create_times.push(ts_millis);
- }
let schema = branches_schema();
let batch = RecordBatch::try_new(
@@ -121,3 +101,20 @@ impl TableProvider for BranchesTable {
)?)
}
}
+
+async fn collect_branches(table: &Table) -> paimon::Result<(Vec<String>,
Vec<i64>)> {
+ let file_io = table.file_io();
+ let bm = BranchManager::new(file_io.clone(), table.location().to_string());
+ let names = bm.list_all().await?;
+ let mut create_times = Vec::with_capacity(names.len());
+ for name in &names {
+ let status = file_io.get_status(&bm.branch_path(name)).await?;
+ create_times.push(
+ status
+ .last_modified
+ .map(|dt| dt.timestamp_millis())
+ .unwrap_or(0),
+ );
+ }
+ Ok((names, create_times))
+}
diff --git a/crates/integrations/datafusion/src/system_tables/manifests.rs
b/crates/integrations/datafusion/src/system_tables/manifests.rs
index bc80805..12b2864 100644
--- a/crates/integrations/datafusion/src/system_tables/manifests.rs
+++ b/crates/integrations/datafusion/src/system_tables/manifests.rs
@@ -83,9 +83,11 @@ impl TableProvider for ManifestsTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- let metas = collect_manifests(&self.table)
- .await
- .map_err(to_datafusion_error)?;
+ let table = self.table.clone();
+ let metas =
+ crate::runtime::await_with_runtime(async move {
collect_manifests(&table).await })
+ .await
+ .map_err(to_datafusion_error)?;
let n = metas.len();
let mut file_names: Vec<String> = Vec::with_capacity(n);
diff --git a/crates/integrations/datafusion/src/system_tables/schemas.rs
b/crates/integrations/datafusion/src/system_tables/schemas.rs
index 03da793..2ff9d40 100644
--- a/crates/integrations/datafusion/src/system_tables/schemas.rs
+++ b/crates/integrations/datafusion/src/system_tables/schemas.rs
@@ -85,10 +85,11 @@ impl TableProvider for SchemasTable {
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
- let schemas = self
- .table
- .schema_manager()
- .list_all()
+ let table = self.table.clone();
+ let schemas =
+ crate::runtime::await_with_runtime(
+ async move { table.schema_manager().list_all().await },
+ )
.await
.map_err(to_datafusion_error)?;
diff --git a/crates/integrations/datafusion/src/system_tables/snapshots.rs
b/crates/integrations/datafusion/src/system_tables/snapshots.rs
index 6d0ae0b..9751ef3 100644
--- a/crates/integrations/datafusion/src/system_tables/snapshots.rs
+++ b/crates/integrations/datafusion/src/system_tables/snapshots.rs
@@ -95,7 +95,9 @@ impl TableProvider for SnapshotsTable {
self.table.file_io().clone(),
self.table.location().to_string(),
);
- let snapshots = sm.list_all().await.map_err(to_datafusion_error)?;
+ let snapshots = crate::runtime::await_with_runtime(async move {
sm.list_all().await })
+ .await
+ .map_err(to_datafusion_error)?;
let n = snapshots.len();
let mut snapshot_ids = Vec::with_capacity(n);
diff --git a/crates/integrations/datafusion/src/system_tables/tags.rs
b/crates/integrations/datafusion/src/system_tables/tags.rs
index 1559b67..ab0316f 100644
--- a/crates/integrations/datafusion/src/system_tables/tags.rs
+++ b/crates/integrations/datafusion/src/system_tables/tags.rs
@@ -94,7 +94,9 @@ impl TableProvider for TagsTable {
self.table.file_io().clone(),
self.table.location().to_string(),
);
- let tags = tm.list_all().await.map_err(to_datafusion_error)?;
+ let tags = crate::runtime::await_with_runtime(async move {
tm.list_all().await })
+ .await
+ .map_err(to_datafusion_error)?;
let n = tags.len();
let mut tag_names: Vec<String> = Vec::with_capacity(n);