This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d61fec5 fix: keep Vortex runtime alive during async IO (#375)
d61fec5 is described below
commit d61fec5c14312025813fe2bd79018938a2549a4d
Author: QuakeWang <[email protected]>
AuthorDate: Wed Jun 10 21:14:08 2026 +0800
fix: keep Vortex runtime alive during async IO (#375)
---
crates/paimon/src/arrow/format/vortex.rs | 63 ++++++++++++++++++++++++++++++--
1 file changed, 60 insertions(+), 3 deletions(-)
diff --git a/crates/paimon/src/arrow/format/vortex.rs
b/crates/paimon/src/arrow/format/vortex.rs
index f0a3a48..6dfce9c 100644
--- a/crates/paimon/src/arrow/format/vortex.rs
+++ b/crates/paimon/src/arrow/format/vortex.rs
@@ -24,9 +24,12 @@ use arrow_array::RecordBatch;
use arrow_schema::{DataType as ArrowDataType, SchemaRef};
use async_trait::async_trait;
use futures::future::BoxFuture;
+use futures::Stream;
use futures::StreamExt;
+use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
+use std::task::{Context, Poll};
use vortex::array::arrow::{FromArrowArray, IntoArrowArray};
use vortex::array::dtype::arrow::FromArrowType;
use vortex::array::dtype::DType;
@@ -38,6 +41,9 @@ use vortex::array::ArrayRef;
use vortex::buffer::{Alignment, ByteBuffer};
use vortex::error::VortexResult;
use vortex::file::{OpenOptionsSessionExt, WriteOptionsSessionExt};
+use vortex::io::runtime::tokio::TokioRuntime;
+use vortex::io::runtime::BlockingRuntime;
+use vortex::io::session::RuntimeSessionExt;
use vortex::io::{IoBuf, VortexReadAt, VortexWrite};
use vortex::scan::selection::Selection;
use vortex::session::VortexSession;
@@ -50,6 +56,51 @@ use vortex::VortexSessionDefault;
/// Maximum number of concurrent read requests for Vortex file IO.
const DEFAULT_READ_CONCURRENCY: usize = 10;
+// ---------------------------------------------------------------------------
+// Vortex Runtime
+// ---------------------------------------------------------------------------
+
+struct PaimonVortexRuntime {
+ runtime: TokioRuntime,
+}
+
+impl PaimonVortexRuntime {
+ fn new() -> crate::Result<Arc<Self>> {
+ let handle = tokio::runtime::Handle::try_current().map_err(|e|
Error::DataInvalid {
+ message: format!("Vortex requires an active Tokio runtime: {e}"),
+ source: None,
+ })?;
+ Ok(Arc::new(Self {
+ runtime: TokioRuntime::new(handle),
+ }))
+ }
+
+ fn session(&self) -> VortexSession {
+ VortexSession::default().with_handle(self.runtime.handle())
+ }
+}
+
+fn new_vortex_session() -> crate::Result<(VortexSession,
Arc<PaimonVortexRuntime>)> {
+ let runtime = PaimonVortexRuntime::new()?;
+ let session = runtime.session();
+ Ok((session, runtime))
+}
+
+struct VortexRecordBatchStream {
+ inner: ArrowRecordBatchStream,
+ _runtime: Arc<PaimonVortexRuntime>,
+}
+
+impl Unpin for VortexRecordBatchStream {}
+
+impl Stream for VortexRecordBatchStream {
+ type Item = crate::Result<RecordBatch>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ self.get_mut().inner.as_mut().poll_next(cx)
+ }
+}
+
// ---------------------------------------------------------------------------
// IO Adapters
// ---------------------------------------------------------------------------
@@ -110,7 +161,7 @@ impl FormatFileReader for VortexFormatReader {
_batch_size: Option<usize>,
row_selection: Option<Vec<RowRange>>,
) -> crate::Result<ArrowRecordBatchStream> {
- let session = VortexSession::default();
+ let (session, runtime) = new_vortex_session()?;
let source = Arc::new(PaimonVortexReadAt {
file_size,
@@ -201,7 +252,10 @@ impl FormatFileReader for VortexFormatReader {
result.and_then(|vortex_array|
vortex_array_to_record_batch(vortex_array, &schema))
});
- Ok(Box::pin(stream))
+ Ok(Box::pin(VortexRecordBatchStream {
+ inner: Box::pin(stream),
+ _runtime: runtime,
+ }))
}
}
@@ -552,10 +606,13 @@ pub(crate) struct VortexFormatWriter {
write_task:
Option<tokio::task::JoinHandle<VortexResult<vortex::file::WriteSummary>>>,
/// Bytes already flushed to storage (updated by the background task).
bytes_written: Arc<AtomicU64>,
+ /// Keeps the Vortex runtime handle alive while the write task uses the
session.
+ _runtime: Arc<PaimonVortexRuntime>,
}
impl VortexFormatWriter {
pub(crate) async fn new(output: &OutputFile, schema: SchemaRef) ->
crate::Result<Self> {
+ let (session, runtime) = new_vortex_session()?;
let dtype = DType::from_arrow(schema);
// Create channel for streaming arrays to the background writer.
@@ -575,7 +632,6 @@ impl VortexFormatWriter {
};
// Spawn the background write task.
- let session = VortexSession::default();
let write_task = tokio::spawn(async move {
let mut sink = sink;
let result = session
@@ -593,6 +649,7 @@ impl VortexFormatWriter {
sender: Some(sender),
write_task: Some(write_task),
bytes_written,
+ _runtime: runtime,
})
}
}