alexanderbianchi opened a new issue, #1945:
URL: https://github.com/apache/iceberg-rust/issues/1945
### Is your feature request related to a problem or challenge?
Add support for configuring a custom Tokio runtime handle for OpenDAL
operations, enabling proper runtime segregation in DataFusion applications that
separate CPU-bound and I/O-bound workloads across different Tokio runtimes.
### Describe the solution you'd like
## Proposed Solution
### API Design
Leverage iceberg-rust's existing `Extensions` mechanism to allow users to
configure a Tokio runtime handle:
```rust
// In iceberg-rust: crates/iceberg/src/io/file_io.rs
/// Runtime handle for executing async I/O operations.
/// When provided, OpenDAL operations will use this runtime for spawning
tasks.
#[derive(Clone, Debug)]
pub struct RuntimeHandle(pub tokio::runtime::Handle);
impl RuntimeHandle {
/// Create a new RuntimeHandle from a Tokio runtime handle
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self(handle)
}
/// Get the current runtime handle
pub fn current() -> Self {
Self(tokio::runtime::Handle::current())
}
}
```
### Usage Example
```rust
// Create dedicated I/O runtime
let io_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.thread_name("io-pool")
.enable_io()
.enable_time()
.build()?;
// Configure FileIO with runtime handle
let file_io = FileIOBuilder::new("s3")
.with_extension(RuntimeHandle::new(io_runtime.handle().clone()))
.with_props(s3_config)
.build()?;
// Or configure via catalog
let catalog = RestCatalogBuilder::new()
.with_file_io_extension(RuntimeHandle::new(io_runtime.handle().clone()))
.with_props(catalog_config)
.build()?;
```
### Implementation Approach
1. **Create Custom OpenDAL Executor**
Implement OpenDAL's `Execute` trait with a custom Tokio executor:
```rust
// In crates/iceberg/src/io/storage.rs
#[derive(Clone)]
struct CustomTokioExecutor {
handle: tokio::runtime::Handle,
}
impl opendal::Execute for CustomTokioExecutor {
fn execute(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.handle.spawn(f);
}
}
```
2. **Extract RuntimeHandle from Extensions**
Modify storage backend builders to check for `RuntimeHandle` in extensions:
```rust
// In crates/iceberg/src/io/storage.rs
pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
let (scheme_str, props, extensions) = file_io_builder.into_parts();
// Extract runtime handle if provided
let executor = if let Some(runtime_handle) =
extensions.get::<RuntimeHandle>() {
let exec = CustomTokioExecutor {
handle: Arc::unwrap_or_clone(runtime_handle).0
};
Some(opendal::Executor::with(exec))
} else {
None // Use OpenDAL default
};
// ... storage initialization
}
```
3. **Apply Executor to Operators**
Configure OpenDAL operators with the custom executor:
```rust
let mut operator = Operator::new(builder)?.finish();
if let Some(executor) = executor {
operator = operator.with_executor(executor);
}
operator = operator.layer(RetryLayer::new());
```
### Willingness to contribute
I can contribute to this feature independently
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]