This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e46d3a  fix(datafusion): wrap system table scan IO in 
await_with_runtime to support FFI (#295)
0e46d3a is described below

commit 0e46d3a14369b17836fb468b14ce874b30e9da9a
Author: Jiajia Li <[email protected]>
AuthorDate: Sat May 9 09:16:08 2026 +0800

    fix(datafusion): wrap system table scan IO in await_with_runtime to support 
FFI (#295)
---
 .../datafusion/src/system_tables/branches.rs       | 43 ++++++++++------------
 .../datafusion/src/system_tables/manifests.rs      |  8 ++--
 .../datafusion/src/system_tables/schemas.rs        |  9 +++--
 .../datafusion/src/system_tables/snapshots.rs      |  4 +-
 .../datafusion/src/system_tables/tags.rs           |  4 +-
 5 files changed, 36 insertions(+), 32 deletions(-)

diff --git a/crates/integrations/datafusion/src/system_tables/branches.rs 
b/crates/integrations/datafusion/src/system_tables/branches.rs
index 581fb97..468a9ad 100644
--- a/crates/integrations/datafusion/src/system_tables/branches.rs
+++ b/crates/integrations/datafusion/src/system_tables/branches.rs
@@ -79,31 +79,11 @@ impl TableProvider for BranchesTable {
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        let bm = BranchManager::new(
-            self.table.file_io().clone(),
-            self.table.location().to_string(),
-        );
-        let branch_names = bm.list_all().await.map_err(to_datafusion_error)?;
-
-        let n = branch_names.len();
-        let mut names: Vec<String> = Vec::with_capacity(n);
-        let mut create_times = Vec::with_capacity(n);
-
-        for name in branch_names {
-            let branch_path = bm.branch_path(&name);
-            let status = self
-                .table
-                .file_io()
-                .get_status(&branch_path)
+        let table = self.table.clone();
+        let (names, create_times) =
+            crate::runtime::await_with_runtime(async move { 
collect_branches(&table).await })
                 .await
                 .map_err(to_datafusion_error)?;
-            let ts_millis = status
-                .last_modified
-                .map(|dt| dt.timestamp_millis())
-                .unwrap_or(0);
-            names.push(name);
-            create_times.push(ts_millis);
-        }
 
         let schema = branches_schema();
         let batch = RecordBatch::try_new(
@@ -121,3 +101,20 @@ impl TableProvider for BranchesTable {
         )?)
     }
 }
+
+async fn collect_branches(table: &Table) -> paimon::Result<(Vec<String>, 
Vec<i64>)> {
+    let file_io = table.file_io();
+    let bm = BranchManager::new(file_io.clone(), table.location().to_string());
+    let names = bm.list_all().await?;
+    let mut create_times = Vec::with_capacity(names.len());
+    for name in &names {
+        let status = file_io.get_status(&bm.branch_path(name)).await?;
+        create_times.push(
+            status
+                .last_modified
+                .map(|dt| dt.timestamp_millis())
+                .unwrap_or(0),
+        );
+    }
+    Ok((names, create_times))
+}
diff --git a/crates/integrations/datafusion/src/system_tables/manifests.rs 
b/crates/integrations/datafusion/src/system_tables/manifests.rs
index bc80805..12b2864 100644
--- a/crates/integrations/datafusion/src/system_tables/manifests.rs
+++ b/crates/integrations/datafusion/src/system_tables/manifests.rs
@@ -83,9 +83,11 @@ impl TableProvider for ManifestsTable {
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        let metas = collect_manifests(&self.table)
-            .await
-            .map_err(to_datafusion_error)?;
+        let table = self.table.clone();
+        let metas =
+            crate::runtime::await_with_runtime(async move { 
collect_manifests(&table).await })
+                .await
+                .map_err(to_datafusion_error)?;
 
         let n = metas.len();
         let mut file_names: Vec<String> = Vec::with_capacity(n);
diff --git a/crates/integrations/datafusion/src/system_tables/schemas.rs 
b/crates/integrations/datafusion/src/system_tables/schemas.rs
index 03da793..2ff9d40 100644
--- a/crates/integrations/datafusion/src/system_tables/schemas.rs
+++ b/crates/integrations/datafusion/src/system_tables/schemas.rs
@@ -85,10 +85,11 @@ impl TableProvider for SchemasTable {
         _filters: &[Expr],
         _limit: Option<usize>,
     ) -> DFResult<Arc<dyn ExecutionPlan>> {
-        let schemas = self
-            .table
-            .schema_manager()
-            .list_all()
+        let table = self.table.clone();
+        let schemas =
+            crate::runtime::await_with_runtime(
+                async move { table.schema_manager().list_all().await },
+            )
             .await
             .map_err(to_datafusion_error)?;
 
diff --git a/crates/integrations/datafusion/src/system_tables/snapshots.rs 
b/crates/integrations/datafusion/src/system_tables/snapshots.rs
index 6d0ae0b..9751ef3 100644
--- a/crates/integrations/datafusion/src/system_tables/snapshots.rs
+++ b/crates/integrations/datafusion/src/system_tables/snapshots.rs
@@ -95,7 +95,9 @@ impl TableProvider for SnapshotsTable {
             self.table.file_io().clone(),
             self.table.location().to_string(),
         );
-        let snapshots = sm.list_all().await.map_err(to_datafusion_error)?;
+        let snapshots = crate::runtime::await_with_runtime(async move { 
sm.list_all().await })
+            .await
+            .map_err(to_datafusion_error)?;
 
         let n = snapshots.len();
         let mut snapshot_ids = Vec::with_capacity(n);
diff --git a/crates/integrations/datafusion/src/system_tables/tags.rs 
b/crates/integrations/datafusion/src/system_tables/tags.rs
index 1559b67..ab0316f 100644
--- a/crates/integrations/datafusion/src/system_tables/tags.rs
+++ b/crates/integrations/datafusion/src/system_tables/tags.rs
@@ -94,7 +94,9 @@ impl TableProvider for TagsTable {
             self.table.file_io().clone(),
             self.table.location().to_string(),
         );
-        let tags = tm.list_all().await.map_err(to_datafusion_error)?;
+        let tags = crate::runtime::await_with_runtime(async move { 
tm.list_all().await })
+            .await
+            .map_err(to_datafusion_error)?;
 
         let n = tags.len();
         let mut tag_names: Vec<String> = Vec::with_capacity(n);

Reply via email to