On 2013-11-20 07:34:36 +0000, Chris Williams said:
On Wednesday, 20 November 2013 at 04:24:14 UTC, Daniel Murphy wrote:
This is the correct forum to post phobos proposals on.
Well then, here's what I had written:
A few applications I've considered implementing seem like they would be
easier if there was a channel-based messaging system in
std.concurrency. I'm happy to do this implementation, but I thought I
would try to get some sort of sign-off before doing so. Following, I
will lay out my argument for the addition, and then the API that I am
considering.
---
One fairly common task is thread-pooling. With the standard
send/receive model currently implemented, you have to choose a specific
thread to target when you send a task. While it's true that you can
simply iterate through your list of threads over and over, to spread
the load evenly over them, that presumes that all tasks take even
processing time. It makes more sense to be able to push data into a
shared channel (secretly a work queue), and the first thread that
finishes its previous task will be able to immediately pull the task
before everyone else. This also means that the necessity of passing
around references to your threads so that they can be looped over goes
away.
I haven't tested it, but it looks like this sort of thing might be
quasi-possible using the register/unregister/locate methods. As each
thread starts, it can register itself with a named group (i.e.
channel), and then anyone who wants to send an item to an arbitrary
thread in that group can call locate() to retrieve one thread and call
send() against the Tid. The target thread would then need to unregister
itself while it is doing work, then re-register itself. My complaint
against this is the need to unregister and re-register. If the thread
issuing commands sends a large number of tasks all at once, they will
all go to the same thread (if coded poorly) or the caller will need to
use yield() or sleep() to allow the target thread to receive the task
and unregister, so that locate() can find a different thread. That's
not terribly efficient. I am also concerned that there's the chance
that all threads will be unregistered when we call locate(), whereas a
channeling system would be able to expand the mailbox during the times
that all threads are busy.
The actual implementation within concurrency.d also concerns me as (if
I read it correctly), the most recent item to register() will be the
one which locate() finds, rather than the thread which has been
registered the longest. While I suppose it's probably not too large of
an issue if the same two threads keep taking all the tasks - that means
that your load can't exceed two threads worth of processing power - it
still seems like a LIFO system would be better. The registry is also
based on an array rather than a set, which can make removal an O(n)
operation, if the contents of the registry have to be shifted left, to
fill an empty spot.
Overall, I think that adding a shared message box system would be a
straightforward way to improve the handling of thread pooling via the
actor model.
---
A less common use-case but I was also considering some world-simulators
(e.g. for studying economics or building a game map) and here the
ability to broadcast messages to a large set of other actors, based on
location, interest, etc. seems useful. In this case, messages would
need to be copied out to each subscriber in the channel rather than
having an existence as a point to point connection. For a networked
game, most likely you would want to break each channel into two, where
locally all senders on a channel push to a single listener that pipes
the messages over the network, and then remotely the messages would be
broadcast to many listeners again, but that's a reasonably
straightforward task for someone to implement on top of the channel
functionality. I don't think that such functionality is needed in
Phobos itself. Mostly, the presence of the broadcasting functionality
in the standard library allows them to use the easy and safe actor
model for more creative uses than a straight one-to-one pipe.
---
Overall, my hope would be to develop something that is conceptually no
more difficult to deal with than the current send()/receive() model,
but also able to be used in a wide variety of ways. The API that I
would propose to develop is:
interface Channel {
void send(T...)(T vals);
void prioritySend(T...)(T vals);
void receive(T...)(out Tid sender, T ops);
receiveOnlyRet!(T) receiveOnly(T...)();
bool receiveTimeout(T...)(Duration d, T ops);
void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis);
void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid)
doThisFunc);
}
class SingleChannel : Channel {} // Send inserts a message into a
shared message box. Receive removes message
class DuplicateChannel(bool echo = true) : Channel {} // Send inserts
the message into a message box per-recipient. Receive removes message
in the calling thread's channel message box. If echo is false, messages
will not be sent back to the sender, even if they are a registered
listener
void registerSend(Channel c, Tid tid = thisTid); // used by function
sendAll(). Channel can be of either type
void unregisterSend(Channel c, Tid tid = thisTid);
void registerReceive(Channel c, Tid tid = thisTid); // used by function
receiveAll(). Channel can be of either type
void unregisterReceive(Channel c, Tid tid = thisTid);
void sendAll(T...)(T ops); // Sends a copy of message to all channels
this thread has registered for.
void receiveAll(T...)(out Channel c, out Tid sender, T ops); //
Receives a message of type T from any channel that we are registered
for. Returns channel and sender
I believe that the look and feel stays fairly consistent with the
current set of functions in std.concurrency. I've added the ability for
the recipient to infer information about the sender since, in the
duplication model, I believe there are quite a few cases where this
would be important information. And of course, I've added the option to
register/unregister threads other than ourselves to allow a greater
range of code layouts, though it's possible that the lack of this sort
of thing in the original code is due to some sort of safety concern?
The most straightforward way to implement the DuplicateChannel would be
to use the individual threads' message boxes, but this would mean that
data put into a channel could be pulled out via the traditional
receive() method. Currently, my intention would be to partition these
two systems (the direct send()/receive() model and the channel model),
unless anyone has any reason to think they should be merged into a
single whole?
Those are my thoughts, anyways. Comments? Complaints?
How does one receive from multiple channels out-of-order? I would
rather this sent it to the subscribed Tid via send, rather than having
an additional queue. It could possible send a ChannelMessage which
has a reference to the sending channel and the message. I understand
this is a different model than what Go and whatnot use, but I think
it's more pratical in some circumstances. Maybe both ways would be
good? I personally use this method in my vibe-d server.