alamb commented on a change in pull request #1112:
URL: https://github.com/apache/arrow-datafusion/pull/1112#discussion_r728302645
##########
File path: datafusion/src/physical_plan/repartition.rs
##########
@@ -365,56 +382,98 @@ impl RepartitionExec {
Ok(())
}
+}
+
+#[derive(Debug)]
Review comment:
The `AbortOnDrop` structure nicely encapsulates RAAI style, aborting a
bunch of `JoinHandle`s on drop. What would you think about using that same
structure in all of the operators that need this treatment (e.g. SortExec, etc)
rather than using `PinnedDrop` and raw `JoinHandles` for the other
You might even be able to encapsulate the JoinHandle creation into
`AbortOnDrop` so it was easier to ensure that all tasks spawned in physical
plans were doing the right thing (aka we could avoid using `tokio::task::spawn`
directly). Having a controlled wrapper around task spawning might be useful for
other activities too (like being able to control which threadpool was used to
run the task)
##########
File path: datafusion/src/physical_plan/repartition.rs
##########
@@ -853,4 +919,26 @@ mod tests {
let schema = batch1.schema();
BarrierExec::new(vec![vec![batch1, batch2], vec![batch3, batch4]],
schema)
}
+
+ #[tokio::test]
+ async fn test_drop_cancel() -> Result<()> {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
+
+ let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
2));
+ let refs = blocking_exec.refs();
+ let sort_exec = Arc::new(RepartitionExec::try_new(
+ blocking_exec,
+ Partitioning::UnknownPartitioning(1),
+ )?);
+
+ let fut = collect(sort_exec);
+ let mut fut = fut.boxed();
+
+ assert_is_pending(&mut fut);
Review comment:
👍
##########
File path: datafusion/src/physical_plan/repartition.rs
##########
@@ -55,11 +56,14 @@ pub struct RepartitionExec {
/// Partitioning scheme to use
partitioning: Partitioning,
/// Channels for sending batches from input partitions to output
partitions.
- /// Key is the partition number
- channels: Arc<
- Mutex<
+ /// Key is the partition number.
+ ///
+ /// Stored alongside is an abort marker that will kill the background job
once it's no longer needed.
+ channels_and_abort_helper: Arc<
Review comment:
What would you think about using a named struct (that can have
additional documentation on it) rather than
```rust
(HashMap<usize, (UnboundedSender<MaybeBatch>,
UnboundedReceiver<MaybeBatch>)>,
Arc<AbortOnDrop>,)
```
to
```rust
struct ChannelsAndAbortHelper {
channels: HashMap<usize, (UnboundedSender<MaybeBatch>,
UnboundedReceiver<MaybeBatch>)>,
abort: Arc<AbortOnDrop>
}
```
And then you could rename `channels_and_abort_helper` to `state` or
something shorter and have the inner field name add semantic value
We can do that as a follow on refactor, but I wanted to get your opinion
##########
File path: datafusion/src/physical_plan/windows/window_agg_exec.rs
##########
@@ -240,16 +248,19 @@ impl WindowAggStream {
let (tx, rx) = futures::channel::oneshot::channel();
let schema_clone = schema.clone();
let elapsed_compute = baseline_metrics.elapsed_compute().clone();
- tokio::spawn(async move {
+ let join_handle = tokio::spawn(async move {
let schema = schema_clone.clone();
let result =
WindowAggStream::process(input, window_expr, schema,
elapsed_compute)
.await;
- tx.send(result)
+
+ // failing here is OK, the receiver is gone and does not care
about the result
Review comment:
👍
##########
File path: datafusion/src/physical_plan/repartition.rs
##########
@@ -365,56 +382,98 @@ impl RepartitionExec {
Ok(())
}
+}
+
+#[derive(Debug)]
+struct AbortOnDrop(Vec<JoinHandle<()>>);
+
+impl Drop for AbortOnDrop {
+ fn drop(&mut self) {
+ for join_handle in &self.0 {
+ join_handle.abort();
+ }
+ }
+}
+pin_project! {
/// Waits for `input_task` which is consuming one of the inputs to
/// complete. Upon each successful completion, sends a `None` to
/// each of the output tx channels to signal one of the inputs is
/// complete. Upon error, propagates the errors to all output tx
/// channels.
- async fn wait_for_task(
+ struct WaitForTask {
+ #[pin]
input_task: JoinHandle<Result<()>>,
txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
- ) {
+ }
+
+ impl PinnedDrop for WaitForTask {
Review comment:
As above, I wonder if this could have a single `abort_on_drop` field
instead of `input_task` and this `PinnedDrop` implementation
##########
File path: datafusion/src/physical_plan/windows/mod.rs
##########
@@ -258,4 +260,35 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_drop_cancel() -> Result<()> {
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Float32,
true)]));
+
+ let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema),
1));
+ let refs = blocking_exec.refs();
+ let sort_exec = Arc::new(WindowAggExec::try_new(
Review comment:
```suggestion
let window_agg_exec = Arc::new(WindowAggExec::try_new(
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]