blackmwk commented on code in PR #2308:
URL: https://github.com/apache/iceberg-rust/pull/2308#discussion_r3233682246
##########
crates/iceberg/src/table.rs:
##########
@@ -159,6 +170,9 @@ pub struct Table {
identifier: TableIdent,
readonly: bool,
object_cache: Arc<ObjectCache>,
+ /// Runtime explicitly attached at build time. `None` means "resolve from
+ /// the ambient tokio runtime on demand"
+ runtime: Option<Runtime>,
Review Comment:
I think it's good idea to mark it `Option`, we now force `Runtime` support,
and it should be passed by user explicitly.
##########
crates/iceberg/src/runtime/mod.rs:
##########
@@ -17,59 +17,304 @@
// This module contains the async runtime abstraction for iceberg.
+use std::fmt;
use std::future::Future;
use std::pin::Pin;
+use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::task;
+use crate::{Error, ErrorKind, Result};
+
+/// Wrapper around tokio's `JoinHandle` that converts task failures into
+/// [`iceberg::Error`].
+///
+/// Tokio's `JoinHandle<T>` resolves to `Result<T, JoinError>`, where a
+/// `JoinError` means the task either panicked or was cancelled (typically from
+/// runtime shutdown or `abort`). Both are surfaced here as
+/// `ErrorKind::Unexpected` with the original `JoinError` preserved as the
+/// source.
pub struct JoinHandle<T>(task::JoinHandle<T>);
impl<T> Unpin for JoinHandle<T> {}
impl<T: Send + 'static> Future for JoinHandle<T> {
- type Output = T;
+ type Output = crate::Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.get_mut() {
- JoinHandle(handle) => Pin::new(handle)
- .poll(cx)
- .map(|r| r.expect("tokio spawned task failed")),
- }
+ Pin::new(&mut self.get_mut().0).poll(cx).map(|r| {
+ r.map_err(|e| Error::new(ErrorKind::Unexpected, "spawned task
failed").with_source(e))
+ })
+ }
+}
+
+/// Handle to a single tokio runtime.
+///
+/// Wraps a [`tokio::runtime::Handle`], which is cheap to clone. The caller is
+/// responsible for keeping the underlying runtime alive while this handle is
+/// in use; spawning on a shut-down runtime will surface as a `JoinError` via
+/// [`JoinHandle`].
+#[derive(Clone)]
+pub struct RuntimeHandle {
+ handle: tokio::runtime::Handle,
+}
+
+impl fmt::Debug for RuntimeHandle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("RuntimeHandle").finish()
+ }
+}
+
+impl RuntimeHandle {
+ fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self {
+ Self { handle }
+ }
+
+ /// Spawn an async task.
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ JoinHandle(self.handle.spawn(future))
+ }
+
+ /// Spawn a blocking task.
+ pub fn spawn_blocking<F, T>(&self, f: F) -> JoinHandle<T>
+ where
+ F: FnOnce() -> T + Send + 'static,
+ T: Send + 'static,
+ {
+ JoinHandle(self.handle.spawn_blocking(f))
}
}
-#[allow(dead_code)]
-pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
-where
- F: std::future::Future + Send + 'static,
- F::Output: Send + 'static,
-{
- JoinHandle(task::spawn(f))
+/// Iceberg's runtime abstraction.
+///
+/// Contains separate handles for IO-bound and CPU-bound work. When constructed
+/// with a single tokio runtime, both `io()` and `cpu()` route to the same one.
+/// Use [`Runtime::new_with_split`] to provide dedicated runtimes for each
+/// category.
+///
+/// # Lifetime
+///
+/// A `Runtime` stores only `tokio::runtime::Handle`s (weak references). The
+/// caller owns the tokio runtime's lifetime. If the underlying runtime is
+/// dropped while iceberg is still using it, subsequent spawns will surface as
+/// task cancellation errors via [`JoinHandle`].
+///
+/// Cloning is cheap.
+#[derive(Clone)]
+pub struct Runtime {
+ io: RuntimeHandle,
+ cpu: RuntimeHandle,
}
-#[allow(dead_code)]
-pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
-where
- F: FnOnce() -> T + Send + 'static,
- T: Send + 'static,
-{
- JoinHandle(task::spawn_blocking(f))
+impl fmt::Debug for Runtime {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Runtime").finish()
+ }
+}
+
+impl Runtime {
+ /// Create a Runtime backed by a single tokio runtime for all work.
+ pub fn new(runtime: Arc<tokio::runtime::Runtime>) -> Self {
Review Comment:
```suggestion
pub fn new(runtime: &tokio::runtime::Runtime>) -> Self {
```
##########
crates/iceberg/src/table.rs:
##########
@@ -230,6 +244,15 @@ impl Table {
MetadataTable::new(self)
}
+ /// Returns a resolved [`Runtime`] for this table.
+ ///
+ /// If a runtime was set via [`TableBuilder::runtime`], it is returned.
+ /// Otherwise, this borrows the ambient tokio runtime via
+ /// [`Runtime::current`], which panics if called outside a tokio context.
+ pub(crate) fn runtime(&self) -> Runtime {
Review Comment:
This is incorrect. We have `TableBuilder` and the only place to pass
`Runtime` to `Table` should be in `TableBuilder`.
##########
crates/iceberg/src/table.rs:
##########
@@ -146,6 +156,7 @@ impl TableBuilder {
identifier,
readonly,
object_cache,
+ runtime,
Review Comment:
We should return error here is `Runtime` is not passed.
##########
crates/iceberg/src/runtime/mod.rs:
##########
@@ -17,59 +17,304 @@
// This module contains the async runtime abstraction for iceberg.
+use std::fmt;
use std::future::Future;
use std::pin::Pin;
+use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::task;
+use crate::{Error, ErrorKind, Result};
+
+/// Wrapper around tokio's `JoinHandle` that converts task failures into
+/// [`iceberg::Error`].
+///
+/// Tokio's `JoinHandle<T>` resolves to `Result<T, JoinError>`, where a
+/// `JoinError` means the task either panicked or was cancelled (typically from
+/// runtime shutdown or `abort`). Both are surfaced here as
+/// `ErrorKind::Unexpected` with the original `JoinError` preserved as the
+/// source.
pub struct JoinHandle<T>(task::JoinHandle<T>);
impl<T> Unpin for JoinHandle<T> {}
impl<T: Send + 'static> Future for JoinHandle<T> {
- type Output = T;
+ type Output = crate::Result<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.get_mut() {
- JoinHandle(handle) => Pin::new(handle)
- .poll(cx)
- .map(|r| r.expect("tokio spawned task failed")),
- }
+ Pin::new(&mut self.get_mut().0).poll(cx).map(|r| {
+ r.map_err(|e| Error::new(ErrorKind::Unexpected, "spawned task
failed").with_source(e))
+ })
+ }
+}
+
+/// Handle to a single tokio runtime.
+///
+/// Wraps a [`tokio::runtime::Handle`], which is cheap to clone. The caller is
+/// responsible for keeping the underlying runtime alive while this handle is
+/// in use; spawning on a shut-down runtime will surface as a `JoinError` via
+/// [`JoinHandle`].
+#[derive(Clone)]
+pub struct RuntimeHandle {
+ handle: tokio::runtime::Handle,
+}
+
+impl fmt::Debug for RuntimeHandle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("RuntimeHandle").finish()
+ }
+}
+
+impl RuntimeHandle {
+ fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self {
+ Self { handle }
+ }
+
+ /// Spawn an async task.
+ pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
+ where
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
+ {
+ JoinHandle(self.handle.spawn(future))
+ }
+
+ /// Spawn a blocking task.
+ pub fn spawn_blocking<F, T>(&self, f: F) -> JoinHandle<T>
+ where
+ F: FnOnce() -> T + Send + 'static,
+ T: Send + 'static,
+ {
+ JoinHandle(self.handle.spawn_blocking(f))
}
}
-#[allow(dead_code)]
-pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
-where
- F: std::future::Future + Send + 'static,
- F::Output: Send + 'static,
-{
- JoinHandle(task::spawn(f))
+/// Iceberg's runtime abstraction.
+///
+/// Contains separate handles for IO-bound and CPU-bound work. When constructed
+/// with a single tokio runtime, both `io()` and `cpu()` route to the same one.
+/// Use [`Runtime::new_with_split`] to provide dedicated runtimes for each
+/// category.
+///
+/// # Lifetime
+///
+/// A `Runtime` stores only `tokio::runtime::Handle`s (weak references). The
+/// caller owns the tokio runtime's lifetime. If the underlying runtime is
+/// dropped while iceberg is still using it, subsequent spawns will surface as
+/// task cancellation errors via [`JoinHandle`].
+///
+/// Cloning is cheap.
+#[derive(Clone)]
+pub struct Runtime {
+ io: RuntimeHandle,
+ cpu: RuntimeHandle,
}
-#[allow(dead_code)]
-pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
-where
- F: FnOnce() -> T + Send + 'static,
- T: Send + 'static,
-{
- JoinHandle(task::spawn_blocking(f))
+impl fmt::Debug for Runtime {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("Runtime").finish()
+ }
+}
+
+impl Runtime {
+ /// Create a Runtime backed by a single tokio runtime for all work.
+ pub fn new(runtime: Arc<tokio::runtime::Runtime>) -> Self {
+ let handle =
RuntimeHandle::from_tokio_handle(runtime.handle().clone());
+ Self {
+ io: handle.clone(),
+ cpu: handle,
+ }
+ }
+
+ /// Create a Runtime with separate tokio runtimes for IO and CPU work.
+ pub fn new_with_split(
+ io_runtime: Arc<tokio::runtime::Runtime>,
+ cpu_runtime: Arc<tokio::runtime::Runtime>,
Review Comment:
```suggestion
pub fn new_with_split(
io_runtime: &tokio::runtime::Runtime,
cpu_runtime: &tokio::runtime::Runtime,
```
##########
crates/iceberg/Cargo.toml:
##########
@@ -74,7 +74,7 @@ serde_json = { workspace = true }
serde_repr = { workspace = true }
serde_with = { workspace = true }
strum = { workspace = true, features = ["derive"] }
-tokio = { workspace = true, optional = false, features = ["sync"] }
+tokio = { workspace = true, optional = false, features = ["sync",
"rt-multi-thread"] }
Review Comment:
I think all crates require same feature set of `tokio`? If so, we should
move the features part to workspace root.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]