crepererum commented on a change in pull request #1112:
URL: https://github.com/apache/arrow-datafusion/pull/1112#discussion_r728738176



##########
File path: datafusion/src/physical_plan/repartition.rs
##########
@@ -365,56 +382,98 @@ impl RepartitionExec {
 
         Ok(())
     }
+}
+
+#[derive(Debug)]

Review comment:
       Done. That even allows us to stick to the old `wait_for_task` version.

##########
File path: datafusion/src/physical_plan/repartition.rs
##########
@@ -365,56 +382,98 @@ impl RepartitionExec {
 
         Ok(())
     }
+}
+
+#[derive(Debug)]
+struct AbortOnDrop(Vec<JoinHandle<()>>);
+
+impl Drop for AbortOnDrop {
+    fn drop(&mut self) {
+        for join_handle in &self.0 {
+            join_handle.abort();
+        }
+    }
+}
 
+pin_project! {
     /// Waits for `input_task` which is consuming one of the inputs to
     /// complete. Upon each successful completion, sends a `None` to
     /// each of the output tx channels to signal one of the inputs is
     /// complete. Upon error, propagates the errors to all output tx
     /// channels.
-    async fn wait_for_task(
+    struct WaitForTask {
+        #[pin]
         input_task: JoinHandle<Result<()>>,
         txs: HashMap<usize, UnboundedSender<Option<ArrowResult<RecordBatch>>>>,
-    ) {
+    }
+
+    impl PinnedDrop for WaitForTask {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to