I have been using std.concurrency for a while, and have been very impressed by it. In particular, it was easy to get a complex multi-threaded application going with std.concurrency. The thread-local-storage behaviour of D is really cool too.

However, I have had some problems with std.concurrency (most of which I'm sure are already being addressed by Sean). In particular, I wanted a way to select on multiple file-descriptors so that I could (say) send commands to a thread that also reads from a blocking file-descriptor like a serial port, UDP socket or GTK main loop. I also wanted to experiment with the amazing template capabilities of D that I found out about in the book, and see if I could generate boiler-plate message-passing code that doesn't rely on "magic" like Variant.

The attached file is where I am at so far. It works, but still needs a lot of work to bring it up to scratch. The reason for posting it here is to stimulate some discussion about how D's standard concurrency module should behave (and maybe get a few helpful tips).

Please take a look and let me know what you think.

--
Graham St Jack
module alt.concurrency;

//public import alt.io;

import std.algorithm;
import std.contracts;
import std.stdio;
import std.conv;
import std.traits;
import std.typecons;
import std.typetuple;

import core.sync.condition;
import core.sync.mutex;
import core.thread;
import core.stdc.stdlib;


//
// Provides an alternative message-passing concurrency implementation
// to std.concurrency. It is similar to std.concurrency, but is different in that:
// * Messages cannot contain references to mutable data.
// * Message queues (Channels) are explicitly created and passed to threads as
//   arguments to spawn().
// * Channels are synchronized (and thus shared).
// * The types of messages a Channel supports are well-defined.
// * Messages are processed in order.
// * Multiple threads can receive from the same Channel.
// * Out of band messages are not supported directly, but can be implemented via
//   additional Channels.
// * Channels have an internal pipe that can be used in a Selector to allow
//   a thread to receive input from multiple Channels and file-descriptor devices
//   such as sockets and pipes.
// * There is no automatic thread-termination behaviour.
//
// The motivation for this module is to stimulate some discussion about
// how D's standard message-passing support should behave.
// It is also a learning experience for the author.
// The std.concurrency issues of most interest to the author are:
// * Messages are not named - they are inferred from the types.
// * Lack of compile-time checking.
// * It allows mutable references to be sent in messages.
//
// Graham St Jack, July 2010.
// This is free software - do with it as you will.
//
// Currently only Linux is supported.
//
// TODO - prevent non-shareable data from being passed to spawn().
// TODO - handling of signals.
// TODO - print demangled stack trace on unexpected thread death.
// TODO - provide Windows and Mac support in alt.io, tidy up alt.io
//        and debug it properly.
//


//
// A Channel for sending messages between threads.
// The internal pipe is used to block removal until a message is available,
// and to provide a file-descriptor to select on. Multiple threads
// may add and remove messages from the queue, but usually one thread
// adds and one other thread removes.
//

public class ChannelFull : Exception {
    this() { super("channel full"); }
}

public class ChannelFinalized: Exception {
    this() { super("channel finalized"); }
}

// A SelectableReader returned from a Channel for use in a Selector
// The Channel itself cannot be used because it is shared.
/*
class Selectable : ISelectableReader {
    private int mFd;

    this(int fd) {
        mFd = fd;
    }

    // ISelectableReader implementation:
    override uint read(void * buffer, uint count) {
        assert(false, "read is not supported");
    }
    override int read_descriptor() {
        return mFd;
    }
}
*/

// FIXME - class should be synchronized, but druntime.core.sync's
// Condition class doesn't work with synchronized classes or methods yet.
class Channel(T) {
    private {
        struct Node {
            T     payload;
            Node* next;

            this(T payload) {
                this.payload = payload;
            }
        }

        Node*     mFront;
        Node*     mBack;
        uint      mCapacity;
        uint      mCount;
        //ubyte     mZero;
        //Pipe      mPipe;
        bool      mFinalized;
        Mutex     mMutex;
        Condition mReadCondition;
        Condition mWriteCondition;
    }

    this(uint capacity = 0) {
        mCapacity       = capacity;
        //mPipe           = new Pipe();
        mMutex          = new Mutex();
        mReadCondition  = new Condition(mMutex);
        mWriteCondition = new Condition(mMutex);
    }

    // Return a new Selectable that can be used in a Selector to discover
    // when the Channel is not empty.
    /*
    Selectable new_selectable() {
        synchronized(mMutex) {
            return new Selectable(mPipe.read_descriptor);
        }
    }
    */

    // Finalise the channel, causing it to throw ChannelFinalised on remove() when empty.
    final void finalize() {
        synchronized(mMutex) {
            mFinalized = true;
            if (!mCount) {
                //mPipe.write(&mZero, mZero.sizeof);
                mReadCondition.notifyAll;
            }
        }
    }

    // Add a message to the back of the channel, throwing if full
    final void add(T msg) {
        synchronized(mMutex) {
            if (mCapacity && mCount == mCapacity) {
                throw new ChannelFull();
            }
            auto n = new Node(msg);
            n.next = null;
            if (!mFront) mFront = n;
            if (!mBack)  mBack = n;
            else         mBack.next = n;
            mBack = n;
            if (!mCount) {
                //mPipe.write(&mZero, mZero.sizeof);
                mReadCondition.notifyAll;
            }
            ++mCount;
        }
    }

    // Blocking version of add
    final void add_with_block(T msg) {
        synchronized(mMutex) {
            while (mCapacity && mCount == mCapacity) {
                mWriteCondition.wait;
            }
            add(msg);
        }
    }

    // Remove and return the front message, blocking until one is available
    // or throwing if finalized and empty
    T remove() {
        synchronized(mMutex) {
            while (!mCount) {
                if (mFinalized) {
                    throw new ChannelFinalized();
                }
                else {
                    mReadCondition.wait;
                }
            }
            if (mCapacity && mCount == mCapacity) {
                mWriteCondition.notifyAll;
            }
            T msg = mFront.payload;
            assert(mFront);
            mFront = mFront.next;
            if (!mFront) mBack = null;
            --mCount;
            if (!mCount) {
                //mPipe.read(&mZero, mZero.sizeof);
            }
            return msg;
        }
    }
}


//
// Spawn and start a thread on the given run function.
// FIXME - insist on arguments being shared or !hasAliasing.
//
void spawn(T...)(void function(T) run, T args)
{
    void exec() {
        try {
            run(args);
        }
        catch (Throwable ex) {
            writeln("Thread died with: " ~ ex.msg);
            exit(1);
        }
    }
    auto t = new Thread(&exec);
    t.start();
}



//
// Protocol - defines:
// * a family of messages,
// * a Channel type to send them through,
// * send methods to send the messages,
// * a process method to dispatch received messages, and
// * an interface to process them with.
//
// Synopsis:
//
// alias Protocol!(Message!("message1", string, "action"),
//                 Message!("message2", int, "val1", int, "val2")) workProtocol;
//
// class Worker : workProtocol.IHandler {
//     override void message1(string action) {...}
//     override void message2(int val1, int val2) {...}
// }
//
// void do_work(workProtocol.Chan channel, string name) nothrow {
//     writefln("%s starting", name);
//     auto worker = new Worker();
//     try {
//         for (;;) {
//             workProtocol.receive(channel, worker);
//         }
//     }
//     catch (ChannelFinalized ex) { normal_exit_stuff; }
//     catch (Exception ex)        { abnormal_exit_stuff; }
// }
//
// void main() {
//     auto channel = new workProtocol.Chan;
//     spawn(&do_work, channel);
//     workProtocol.message1(channel, "fred");
//     workProtocol.message2(channel, 1, 2);
//     channel.finalize;
// }
//
//
// 
// Notes:
// * A worker function can be passed any number of channels, and may receive
//   input from any number of other file-decriptor-based event sources,
//   using a Selector to avoid making blocking I/O calls.
// * Message is a cut-down version of Tuple that is intended for use only
//   in composing protocols. It deliberately omits operators, etc that might
//   tempt its use elsewhere.
//



//
// Template that resolves to various string representations of the Message.
// Expects a message name followed by pairs of parameter type and parameter name.
//
template Message(string name, T...) {
    static assert(T.length > 0);
    static assert(!hasAliasing!(T[0]), "Cannot use " ~ T[0].stringof ~ " in a message");

    alias name msgName;

    static if (is(typeof(T[1]) : string)) {
        static if (T.length > 2) {
            alias Message!(name, T[2..$]) Next;

            enum string fieldStr = T[0].stringof ~ " " ~ T[1] ~ "; " ~ Next.fieldStr;
            enum string paramStr = T[0].stringof ~ " " ~ T[1] ~ ", " ~ Next.paramStr;
            enum string nameStr  = T[1] ~ ", " ~ Next.nameStr;
            enum string callStr  = "message." ~ name ~ "." ~ T[1] ~ ", " ~ Next.callStr;
        }
        else {
            enum string fieldStr = T[0].stringof ~ " " ~ T[1] ~";";
            enum string paramStr = T[0].stringof ~ " " ~ T[1];
            enum string nameStr = T[1];
            enum string callStr  = "message." ~ name ~ "." ~ T[1];
        }
    }
    else {
        static assert(false, "Message parameters must be named");
    }
}

//
// Helper to stamp out recursive string-defining templates
//
private template Paste(string name) {
    enum string Paste = "static if (T.length > 1) { enum " ~
        name ~ " = _" ~ name ~ " ~ Next." ~ name ~ "; } else { enum string " ~
        name ~ " = _" ~ name ~ "; }; ";
}

//
// Helper to return strings for use in mixins by Protocol template.
// All the strings roll out to include an entry for each name/Params pair in T...
//
private template ProtocolStrings(uint index, T...) {
    static if (is(typeof(T[0].msgName)  : string) &&
               is(typeof(T[0].fieldStr) : string) &&
               is(typeof(T[0].paramStr) : string) &&
               is(typeof(T[0].nameStr)  : string) &&
               is(typeof(T[0].callStr)  : string)) {

        static if (T.length > 1) {
            alias ProtocolStrings!(index+1, T[1..$]) Next;
        }

        // message struct: struct name { fields }
        enum string _msgStr = "\nstruct " ~ T[0].msgName ~ "Msg { " ~ T[0].fieldStr ~ " }";

        enum string _unionStr = "\n        " ~ T[0].msgName ~ "Msg " ~ T[0].msgName ~ ";";

        enum string _thisStr =
            "\n    this(ref " ~ T[0].msgName ~ "Msg msg) {" ~
            "\n        kind = " ~ to!string(index) ~ ";" ~
            "\n        " ~ T[0].msgName ~ " = msg;" ~
            "\n    }";

        enum string _interfaceStr = "\n    void " ~ T[0].msgName ~ "(" ~ T[0].paramStr ~ ");";

        enum string _sendStr =
            "\nvoid " ~ T[0].msgName ~ "(Chan channel, " ~ T[0].paramStr ~ ") {" ~
            "\n    channel.add(Message(" ~ T[0].msgName ~ "Msg(" ~ T[0].nameStr ~ ")));" ~
            "\n}";

        enum string _caseStr =
            "\n    case " ~ to!string(index) ~ ":" ~
            "\n        handler." ~ T[0].msgName ~ "(" ~ T[0].callStr ~ ");" ~
            "\n        break;";

        mixin(Paste!("msgStr"));
        mixin(Paste!("unionStr"));
        mixin(Paste!("thisStr"));
        mixin(Paste!("interfaceStr"));
        mixin(Paste!("sendStr"));
        mixin(Paste!("caseStr"));
    }
    else {
        static assert(false, "protocols must have pairs of names and Params");
    }
}

//
// Protocol - expects pairs of strings and Params, representing 
//            message names and message parameters.
// The strings used in mixins are available for inspection should you need them.
//
// Params do not need to be different from each other.
// The types used in Params fields must not refer to non-invariant data.
//
template Protocol(T...) {

    alias ProtocolStrings!(0, T) Strings;

    // Define the message types
    enum string msgStr = Strings.msgStr;

    // Define the discriminated union to hold any message
    // struct Message {
    //     uint kind;
    //     union {
    //         messages
    //     }
    //     this(params) {
    //         kind = index; // 0 for first message type, 1 for second, etc
    //         message = { params };
    //     }
    //     ...
    // }
    enum string unionStr =
        "\nstruct Message {" ~
        "\n    uint kind;" ~
        "\n    union { " ~
        Strings.unionStr ~
        "\n    }" ~
        Strings.thisStr ~
        "\n}";

    // Define the channel
    enum string channelStr = "alias Channel!(Message) Chan;";

    // Define the interface type: interface { void msg1_name(msg1_params); ... }
    enum string interfaceStr = "\ninterface IHandler { " ~ Strings.interfaceStr ~ "\n}";

    // Define the send functions:
    // void message_name(Chan channel, params) {
    //     channel.add(invariant(Message)(params))
    // }
    // ...
    enum string sendStr = Strings.sendStr;

    // Define the receive function:
    // void receive(Chan channel, IHandler handler) {
    //    auto message = channel.receive();
    //    switch (message.kind) {
    //        case 0:
    //            break;
    //        ...
    //        default
    //            assert(false, "unsupported message kind");
    //    }
    // }
    enum string receiveStr =
        "\nvoid receive(Chan channel, IHandler handler) {" ~
        "\n    auto message = channel.remove();" ~
        "\n    switch (message.kind) {" ~
        Strings.caseStr ~
        "\n    default:" ~
        "\n        assert(0, \"Received unsupported message kind\");" ~
        "\n    }" ~
        "\n}";

    enum string codeStr = msgStr ~ unionStr ~ channelStr ~ interfaceStr ~ sendStr ~ receiveStr;
    mixin(codeStr);
}


unittest {
    alias Protocol!(Message!("message1", string, "action"),
                    Message!("message2", int, "val1", int, "val2")) workProtocol;

    writefln("%s", workProtocol.codeStr);

    // static necessary to move to module scope so do_work can access
    static class Worker : workProtocol.IHandler {
        override void message1(string action) {
            writefln("got message1: action=%s", action);
        }
        override void message2(int val1, int val2) {
            writefln("got message2: val1=%s val2=%s", val1, val2);
        }
    }

    // static necessary to move to module scope so function ptr can be taken
    static void do_work(workProtocol.Chan channel, string name) {
        try {
            writefln("%s starting", name);
            scope worker = new Worker();
            for (;;) {
                workProtocol.receive(channel, worker);
            }
        }
        catch (ChannelFinalized ex) {}
        catch (Exception ex)        { assert(0, "unexpected exception"); }
    }

    auto channel = new workProtocol.Chan;
    spawn(&do_work, channel, "Sam");
    workProtocol.message1(channel, "Fred");
    workProtocol.message2(channel, 1, 2);
    channel.finalize;
}

Reply via email to