alamb commented on code in PR #10026:
URL: 
https://github.com/apache/arrow-datafusion/pull/10026#discussion_r1560988198


##########
datafusion/physical-plan/src/repartition/distributor_channels.rs:
##########
@@ -53,19 +57,20 @@ pub fn channels<T>(
 ) -> (Vec<DistributionSender<T>>, Vec<DistributionReceiver<T>>) {
     let channels = (0..n)
         .map(|id| {
-            Arc::new(Mutex::new(Channel {
-                data: VecDeque::default(),
-                n_senders: 1,
-                recv_alive: true,
-                recv_wakers: Vec::default(),
+            Arc::new(Channel {

Review Comment:
   It would help me with readability / encapsulation to make this 
`Channel::new()` or `Channel::default()` so it is easier to follow the locking 
logic



##########
datafusion/physical-plan/src/repartition/distributor_channels.rs:
##########
@@ -188,33 +205,48 @@ impl<'a, T> Future for SendFuture<'a, T> {
         let this = &mut *self;
         assert!(this.element.is_some(), "polled ready future");
 
-        let mut guard_channel = this.channel.lock();
-
-        // receiver end still alive?
-        if !guard_channel.recv_alive {
-            return Poll::Ready(Err(SendError(
-                this.element.take().expect("just checked"),
-            )));
-        }
-
-        let mut guard_gate = this.gate.lock();
+        // lock scope
+        let to_wake = {
+            let mut guard_channel_state = this.channel.state.lock();
+
+            let Some(data) = guard_channel_state.data.as_mut() else {
+                // receiver end dead
+                return Poll::Ready(Err(SendError(
+                    this.element.take().expect("just checked"),
+                )));
+            };
+
+            // does ANY receiver need data?
+            // if so, allow sender to create another
+            if this.gate.empty_channels.load(Ordering::SeqCst) == 0 {

Review Comment:
   Is accessing this field before taking the send_wakers a race condition? 
   
   I think the answer is no, as `self.channel.state.lock()` is preventing 
`empty_channels` from changing between this line and the guard
   
   However, if this is the case wouldn't it be clearer to not use an atomic 
usize here and just hang the count on channel state?
   
   



##########
datafusion/physical-plan/src/repartition/distributor_channels.rs:
##########
@@ -188,33 +205,48 @@ impl<'a, T> Future for SendFuture<'a, T> {
         let this = &mut *self;
         assert!(this.element.is_some(), "polled ready future");
 
-        let mut guard_channel = this.channel.lock();
-
-        // receiver end still alive?
-        if !guard_channel.recv_alive {
-            return Poll::Ready(Err(SendError(
-                this.element.take().expect("just checked"),
-            )));
-        }
-
-        let mut guard_gate = this.gate.lock();
+        // lock scope
+        let to_wake = {
+            let mut guard_channel_state = this.channel.state.lock();
+
+            let Some(data) = guard_channel_state.data.as_mut() else {
+                // receiver end dead
+                return Poll::Ready(Err(SendError(
+                    this.element.take().expect("just checked"),
+                )));
+            };
+
+            // does ANY receiver need data?
+            // if so, allow sender to create another
+            if this.gate.empty_channels.load(Ordering::SeqCst) == 0 {
+                let mut guard = this.gate.send_wakers.lock();
+                if let Some(send_wakers) = guard.deref_mut() {
+                    send_wakers.push((cx.waker().clone(), this.channel.id));
+                    return Poll::Pending;
+                }
+            }
 
-        // does ANY receiver need data?
-        // if so, allow sender to create another
-        if guard_gate.empty_channels == 0 {
-            guard_gate
-                .send_wakers
-                .push((cx.waker().clone(), guard_channel.id));
-            return Poll::Pending;
-        }
+            let was_empty = data.is_empty();
+            data.push_back(this.element.take().expect("just checked"));
+
+            if was_empty {
+                this.gate.decr_empty_channels();
+
+                let to_wake = guard_channel_state
+                    .recv_wakers
+                    .as_mut()
+                    .expect("not closed");
+                let mut tmp = Vec::with_capacity(to_wake.capacity());
+                std::mem::swap(to_wake, &mut tmp);
+                tmp

Review Comment:
   This might be more readable if it was in a function like
   
   ```rust
   guard_channel_state.take_wakers()
   ```
   
   Or something - that would also give you a place to document what was 
happening more



##########
datafusion/physical-plan/src/repartition/distributor_channels.rs:
##########
@@ -155,19 +159,32 @@ impl<T> Clone for DistributionSender<T> {
 
 impl<T> Drop for DistributionSender<T> {
     fn drop(&mut self) {
-        let mut guard_channel = self.channel.lock();
-        guard_channel.n_senders -= 1;
+        let n_senders_pre = self.channel.n_senders.fetch_sub(1, 
Ordering::SeqCst);
+        if n_senders_pre > 1 {
+            return;
+        }
+
+        let receivers = {
+            let mut state = self.channel.state.lock();
 
-        if guard_channel.n_senders == 0 {
             // Note: the recv_alive check is so that we don't double-clear the 
status
-            if guard_channel.data.is_empty() && guard_channel.recv_alive {
+            if state
+                .data
+                .as_ref()
+                .map(|data| data.is_empty())
+                .unwrap_or_default()

Review Comment:
   Could we put this into ChannelState as a function with a more readable name? 
Like
   
   ```rust
   if state.channel_gone() {
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to