crepererum commented on code in PR #4867:
URL: https://github.com/apache/arrow-datafusion/pull/4867#discussion_r1069038446


##########
datafusion/core/src/physical_plan/repartition/distributor_channels.rs:
##########
@@ -0,0 +1,669 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Special channel construction to distribute data from varios inputs into N 
outputs.
+//!
+//! # Design
+//!
+//! ```text
+//! +----+      +------+
+//! | TX |==||  | Gate |
+//! +----+  ||  |      |  +--------+  +----+
+//!         ====|      |==| Buffer |==| RX |
+//! +----+  ||  |      |  +--------+  +----+
+//! | TX |==||  |      |
+//! +----+      |      |
+//!             |      |
+//! +----+      |      |  +--------+  +----+
+//! | TX |======|      |==| Buffer |==| RX |
+//! +----+      +------+  +--------+  +----+
+//! ```
+//!
+//! There are `N` virtual MPSC (multi-producer, single consumer) channels with 
unbounded capacity. However, if all
+//! buffers/channels are non-empty, than a global gate will be closed 
preventing new data from being written (the
+//! sender futures will be [pending](Poll::Pending)) until at least one 
channel is empty (and not closed).
+use std::{
+    collections::VecDeque,
+    future::Future,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll, Waker},
+};
+
+use parking_lot::Mutex;
+
+/// Create `n` empty channels.
+pub fn channels<T>(
+    n: usize,
+) -> (Vec<DistributionSender<T>>, Vec<DistributionReceiver<T>>) {
+    let channels = (0..n)
+        .map(|id| {
+            Arc::new(Mutex::new(Channel {
+                data: VecDeque::default(),
+                n_senders: 1,
+                recv_alive: true,
+                recv_wakers: Vec::default(),
+                id,
+            }))
+        })
+        .collect::<Vec<_>>();
+    let gate = Arc::new(Mutex::new(Gate {
+        empty_channels: n,
+        send_wakers: Vec::default(),
+    }));
+    let senders = channels
+        .iter()
+        .map(|channel| DistributionSender {
+            channel: Arc::clone(channel),
+            gate: Arc::clone(&gate),
+        })
+        .collect();
+    let receivers = channels
+        .into_iter()
+        .map(|channel| DistributionReceiver {
+            channel,
+            gate: Arc::clone(&gate),
+        })
+        .collect();
+    (senders, receivers)
+}
+
+/// Erroring during [send](DistributionSender::send).
+///
+/// This occurs when the [receiver](DistributedReceiver) is gone.
+#[derive(PartialEq, Eq)]
+pub struct SendError<T>(pub T);
+
+impl<T> std::fmt::Debug for SendError<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_tuple("SendError").finish()
+    }
+}
+
+impl<T> std::fmt::Display for SendError<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "cannot send data, receiver is gone")
+    }
+}
+
+impl<T> std::error::Error for SendError<T> {}
+
+/// Sender side of distribution [channels].
+///
+/// This handle can be cloned. All clones will write into the same channel. 
Dropping the last sender will close the
+/// channel. In this case, the [receiver](DistributionReceiver) will still be 
able to poll the remaining data, but will
+/// receiver `None` afterwards.
+#[derive(Debug)]
+pub struct DistributionSender<T> {
+    channel: SharedChannel<T>,
+    gate: SharedGate,
+}
+
+impl<T> DistributionSender<T> {
+    /// Send data.
+    ///
+    /// This fails if the [receiver](DistributionReceiver) is gone.
+    pub fn send(&self, element: T) -> SendFuture<'_, T> {
+        SendFuture {
+            channel: &self.channel,
+            gate: &self.gate,
+            element: Box::new(Some(element)),
+        }
+    }
+}
+
+impl<T> Clone for DistributionSender<T> {
+    fn clone(&self) -> Self {
+        let mut guard = self.channel.lock();
+        guard.n_senders += 1;
+
+        Self {
+            channel: Arc::clone(&self.channel),
+            gate: Arc::clone(&self.gate),
+        }
+    }
+}
+
+impl<T> Drop for DistributionSender<T> {
+    fn drop(&mut self) {
+        let mut guard_channel = self.channel.lock();
+        guard_channel.n_senders -= 1;
+
+        if guard_channel.n_senders == 0 {
+            // Note: the recv_alive check is so that we don't double-clear the 
status
+            if guard_channel.data.is_empty() && guard_channel.recv_alive {
+                // channel is gone, so we need to clear our signal
+                let mut guard_gate = self.gate.lock();
+                guard_gate.empty_channels -= 1;
+            }
+
+            // receiver may be waiting for data, but should return `None` now 
since the channel is closed
+            guard_channel.wake_receivers();
+        }
+    }
+}
+
+/// Future backing [send](DistributionSender::send).
+#[derive(Debug)]
+pub struct SendFuture<'a, T> {
+    channel: &'a SharedChannel<T>,
+    gate: &'a SharedGate,
+    // the additional Box is required for `Self: Unpin`
+    element: Box<Option<T>>,
+}
+
+impl<'a, T> Future for SendFuture<'a, T> {
+    type Output = Result<(), SendError<T>>;
+
+    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Self::Output> {
+        let this = &mut *self;
+        assert!(this.element.is_some(), "polled ready future");
+
+        let mut guard_channel = this.channel.lock();
+
+        // receiver end still alive?
+        if !guard_channel.recv_alive {
+            return Poll::Ready(Err(SendError(
+                this.element.take().expect("just checked"),
+            )));
+        }
+
+        let mut guard_gate = this.gate.lock();
+
+        // does ANY receiver need data?
+        if guard_gate.empty_channels == 0 {
+            guard_gate
+                .send_wakers
+                .push((cx.waker().clone(), guard_channel.id));
+            return Poll::Pending;
+        }
+
+        let was_empty = guard_channel.data.is_empty();
+        guard_channel
+            .data
+            .push_back(this.element.take().expect("just checked"));
+        if was_empty {
+            guard_gate.empty_channels -= 1;
+            guard_channel.wake_receivers();
+        }
+
+        Poll::Ready(Ok(()))
+    }
+}
+
+/// Receiver side of distribution [channels].
+#[derive(Debug)]
+pub struct DistributionReceiver<T> {
+    channel: SharedChannel<T>,
+    gate: SharedGate,
+}
+
+impl<T> DistributionReceiver<T> {
+    /// Receive data from channel.
+    ///
+    /// Returns `None` if the channel is empty and no 
[senders](DistributionSender) are left.
+    pub fn recv(&mut self) -> RecvFuture<'_, T> {
+        RecvFuture {
+            channel: &mut self.channel,
+            gate: &mut self.gate,
+            rdy: false,
+        }
+    }
+}
+
+impl<T> Drop for DistributionReceiver<T> {
+    fn drop(&mut self) {
+        let mut guard_channel = self.channel.lock();
+        let mut guard_gate = self.gate.lock();
+        guard_channel.recv_alive = false;
+
+        // Note: n_senders check is here so we don't double-clear the signal
+        if guard_channel.data.is_empty() && (guard_channel.n_senders > 0) {
+            // channel is gone, so we need to clear our signal
+            guard_gate.empty_channels -= 1;
+        }

Review Comment:
   Your proposal relies on the recv side to be dropped, which is technically 
suboptimal. You may wanna close the gate (for the remaining channels) if you 
know that your single empty channel cannot receive any data anymore. This is 
tested in `test_close_channel_by_dropping_tx` (see comment at the end saying 
`channel closed => also close gate`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to