LantaoJin opened a new issue, #68:
URL: https://github.com/apache/datafusion-java/issues/68
### Is your feature request related to a problem or challenge?
DataFrames in `datafusion-java` cannot be interrupted. A long-running
`collect()` / `executeStream()` blocks the calling Java thread for the duration
of the query; even `Thread.interrupt()` on the Java side has no effect because
the JNI call is in `runtime().block_on(...)`. There is no way to free the
native resources of an in-flight query without waiting for it to complete.
**Relation to existing issues:** complementary to open issue **#40**
(close()/JNI use-after-free race) but distinct: #40 is about safely tearing
down a *finished* handle; this is about signalling an *in-flight* future to
stop. Both eventually share the same atomic-handle scaffolding (#40's option
2), so it's worth coordinating.
### Describe the solution you'd like
Two surface options on `SessionContext`, both standalone and composable:
```java
try (var ctx = new SessionContext();
var token = ctx.newCancellationToken()) {
Future<ArrowReader> fut = pool.submit(() -> df.collect(allocator,
token));
// from another thread, on timeout / user-cancel:
token.cancel();
}
```
Plus, optionally, the Spark-shaped tag form for callers managing many
concurrent queries:
```java
ctx.addTag("query-1234");
df.collect(allocator); // inherits the tag from the calling thread
(ThreadLocal)
// from another thread:
ctx.cancelTag("query-1234");
```
The token form is the primitive; the tag form is sugar on top (each tag
tracks its set of live tokens in the session). Both forms cancel via the same
Rust mechanism.
**Native shape.** `tokio_util::sync::CancellationToken` per token +
`tokio::select!` in every `block_on` site (`collectDataFrame`,
`executeStreamDataFrame`, `countRows`, `showDataFrame`). On cancel, the runtime
polls `Err(DataFusionError::ResourcesExhausted("query cancelled"))` (or a new
`DataFusionError::Cancelled` once that lands upstream) and the JNI handler
translates it into a Java `CancellationException` (or
`DataFusionException.cancelled(...)` once #12 lands).
**Cooperation with `DataFrame.executeStream` (#5, done).** The token, if
passed, is held by the streaming `ArrowReader` for its full lifetime; each
`loadNextBatch()` is wrapped in `select!` against the token so a cancel
mid-stream aborts on the next batch poll, not after the stream drains.
**Cooperation with `close()` race (open #40).** Cancelling does not free the
native handle — it just shortcircuits the running future. The handle is still
released by the existing `close()` path, which is where #40's atomic-handle
scaffolding belongs.
### Describe alternatives you've considered
_No response_
### Additional context
### Out of scope
- *Tag form.* Ship the token primitive first; tag is sugar that can land in
a follow-up if a user actually asks for it.
- *Sync-API breakage.* `df.collect(allocator)` keeps working unchanged; the
new method is `df.collect(allocator, token)` (overload).
- *Per-operator cancel granularity.* Today the cancel point is each
`block_on` site; sub-operator cancellation is upstream-DataFusion territory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]