This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new d61fec5  fix: keep Vortex runtime alive during async IO (#375)
d61fec5 is described below

commit d61fec5c14312025813fe2bd79018938a2549a4d
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 10 21:14:08 2026 +0800

    fix: keep Vortex runtime alive during async IO (#375)
---
 crates/paimon/src/arrow/format/vortex.rs | 63 ++++++++++++++++++++++++++++++--
 1 file changed, 60 insertions(+), 3 deletions(-)

diff --git a/crates/paimon/src/arrow/format/vortex.rs 
b/crates/paimon/src/arrow/format/vortex.rs
index f0a3a48..6dfce9c 100644
--- a/crates/paimon/src/arrow/format/vortex.rs
+++ b/crates/paimon/src/arrow/format/vortex.rs
@@ -24,9 +24,12 @@ use arrow_array::RecordBatch;
 use arrow_schema::{DataType as ArrowDataType, SchemaRef};
 use async_trait::async_trait;
 use futures::future::BoxFuture;
+use futures::Stream;
 use futures::StreamExt;
+use std::pin::Pin;
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::Arc;
+use std::task::{Context, Poll};
 use vortex::array::arrow::{FromArrowArray, IntoArrowArray};
 use vortex::array::dtype::arrow::FromArrowType;
 use vortex::array::dtype::DType;
@@ -38,6 +41,9 @@ use vortex::array::ArrayRef;
 use vortex::buffer::{Alignment, ByteBuffer};
 use vortex::error::VortexResult;
 use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
+use vortex::io::runtime::tokio::TokioRuntime;
+use vortex::io::runtime::BlockingRuntime;
+use vortex::io::session::RuntimeSessionExt;
 use vortex::io::{IoBuf, VortexReadAt, VortexWrite};
 use vortex::scan::selection::Selection;
 use vortex::session::VortexSession;
@@ -50,6 +56,51 @@ use vortex::VortexSessionDefault;
 /// Maximum number of concurrent read requests for Vortex file IO.
 const DEFAULT_READ_CONCURRENCY: usize = 10;
 
+// ---------------------------------------------------------------------------
+// Vortex Runtime
+// ---------------------------------------------------------------------------
+
+struct PaimonVortexRuntime {
+    runtime: TokioRuntime,
+}
+
+impl PaimonVortexRuntime {
+    fn new() -> crate::Result<Arc<Self>> {
+        let handle = tokio::runtime::Handle::try_current().map_err(|e| 
Error::DataInvalid {
+            message: format!("Vortex requires an active Tokio runtime: {e}"),
+            source: None,
+        })?;
+        Ok(Arc::new(Self {
+            runtime: TokioRuntime::new(handle),
+        }))
+    }
+
+    fn session(&self) -> VortexSession {
+        VortexSession::default().with_handle(self.runtime.handle())
+    }
+}
+
+fn new_vortex_session() -> crate::Result<(VortexSession, 
Arc<PaimonVortexRuntime>)> {
+    let runtime = PaimonVortexRuntime::new()?;
+    let session = runtime.session();
+    Ok((session, runtime))
+}
+
+struct VortexRecordBatchStream {
+    inner: ArrowRecordBatchStream,
+    _runtime: Arc<PaimonVortexRuntime>,
+}
+
+impl Unpin for VortexRecordBatchStream {}
+
+impl Stream for VortexRecordBatchStream {
+    type Item = crate::Result<RecordBatch>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        self.get_mut().inner.as_mut().poll_next(cx)
+    }
+}
+
 // ---------------------------------------------------------------------------
 // IO Adapters
 // ---------------------------------------------------------------------------
@@ -110,7 +161,7 @@ impl FormatFileReader for VortexFormatReader {
         _batch_size: Option<usize>,
         row_selection: Option<Vec<RowRange>>,
     ) -> crate::Result<ArrowRecordBatchStream> {
-        let session = VortexSession::default();
+        let (session, runtime) = new_vortex_session()?;
 
         let source = Arc::new(PaimonVortexReadAt {
             file_size,
@@ -201,7 +252,10 @@ impl FormatFileReader for VortexFormatReader {
                 result.and_then(|vortex_array| 
vortex_array_to_record_batch(vortex_array, &schema))
             });
 
-        Ok(Box::pin(stream))
+        Ok(Box::pin(VortexRecordBatchStream {
+            inner: Box::pin(stream),
+            _runtime: runtime,
+        }))
     }
 }
 
@@ -552,10 +606,13 @@ pub(crate) struct VortexFormatWriter {
     write_task: 
Option<tokio::task::JoinHandle<VortexResult<vortex::file::WriteSummary>>>,
     /// Bytes already flushed to storage (updated by the background task).
     bytes_written: Arc<AtomicU64>,
+    /// Keeps the Vortex runtime handle alive while the write task uses the 
session.
+    _runtime: Arc<PaimonVortexRuntime>,
 }
 
 impl VortexFormatWriter {
     pub(crate) async fn new(output: &OutputFile, schema: SchemaRef) -> 
crate::Result<Self> {
+        let (session, runtime) = new_vortex_session()?;
         let dtype = DType::from_arrow(schema);
 
         // Create channel for streaming arrays to the background writer.
@@ -575,7 +632,6 @@ impl VortexFormatWriter {
         };
 
         // Spawn the background write task.
-        let session = VortexSession::default();
         let write_task = tokio::spawn(async move {
             let mut sink = sink;
             let result = session
@@ -593,6 +649,7 @@ impl VortexFormatWriter {
             sender: Some(sender),
             write_task: Some(write_task),
             bytes_written,
+            _runtime: runtime,
         })
     }
 }

Reply via email to