I don't know, do you have an example ?

For example taking a classname and a bunch of field types and names, and
turning it into a class definition complete with constructor from field
values, constructor from an input stream, a method to write to an output
stream, and const getters. Or maybe std.typecons.AutoImplement.

Could you post an example of how that mixin would be used and the code it would generate then I can see if I can translate it to my syntax. AutoImplement seems to just contain template mixins which is something else.


I have attached my concurrency framework, which relies heavily on mixins, plus its unit test to show how it is used. I haven't included the various dependencies because I assume you just want the example code. Let me know if you want something buildable, or perhaps something more cut-down.

What the code-generating template does is to create from something like this (you can have any number of Messages in a Protocol):


alias Protocol!("Requests", Message!("job", string, "name")).code jobCode;
mixin(jobCode);


this code:


class Requests {
    struct jobMsg {
        string name;
        this(string name) {
             this.name = name;
        }
        void read(InStream stream) {
             name = stream.get!string;
        }
        void write(OutStream stream) {
            stream(name);
        }
    }
    struct Message {
        uint kind;
        union {
        jobMsg job;
        }
        this(ref jobMsg msg) {
            kind = 0;
            job = msg;
        }
        this(InStream stream) {
            kind = stream.get!uint;
            switch(kind) {
            case 0: job.read(stream); break;
                default: assert(0, "Cannot read unsupported message kind");
            }
        }
        void write(OutStream stream) {
            stream(kind);
            switch(kind) {
            case 0: job.write(stream); break;
default: assert(0, "Cannot write unsupported message kind");
            }
        }
    }
    private alias Channel!(Message) _Chan;
    private alias shared _Chan Chan;

    private Chan channel;
    this() { channel = new Chan(); }
    ChannelSelectable newSelectable() { return channel.newSelectable(); }
    void finalize() { channel.finalize; }

    interface IHandler {
        void job(string name);
    }
    void job(string name) {
        channel.add(Message(jobMsg(name)));
    }
    void receive(IHandler handler) {
        auto message = channel.remove;
        switch (message.kind) {
            case 0: handler.job(message.job.name); break;
            default: assert(0, "Cannot dispatch unsupported message kind");
        }
    }
}



I use this for inter-thread communications, and I use the discriminated union to pass messages between processes. The manual mixin after the alias is a debugging aid - the compiler errors when doing mixins aren't helpful at all. I case you are wondering why I did all this, it was partly a learning experience, but mostly an attempt to do something properly thread-safe (!hasAliasing), using shared, const and immutable properly.

--
Graham St Jack

module bedrock.maxim.concurrency;

public import bedrock.maxim.io;
public import bedrock.maxim.stream;

import bedrock.maxim.logging;

import std.algorithm;
import std.stdio;
import std.conv;
import std.traits;
import std.typecons;
import std.typetuple;

import core.thread;

import core.stdc.stdlib;
import core.stdc.errno;
import core.stdc.config;

import core.sys.posix.pthread;
import core.sys.posix.signal;

static import linux = std.c.linux.linux;


//
// Provides a message-passing concurrency implementation.
// * Messages cannot contain references to mutable data.
// * Message queues (Channels) are explicitly created and passed to threads as
//   arguments to spawn().
// * Channels are synchronized (and thus shared).
// * The types of messages a Channel supports are well-defined.
// * Messages are processed in order.
// * Multiple threads can receive from the same Channel.
// * Channels have an internal pipe that can be used in a Selector to allow
//   a thread to receive input from multiple Channels and file-descriptor devices
//   such as sockets and pipes.
// * There is no automatic thread-termination behaviour.
//
// TODO - prevent non-shared aliased data from being passed to spawn().
// TODO - gathering non-fatal signals via signalfd, putting info on a Channel.
//


//---------------------------------------------------------------------
// Condition - Adapted from core.sync.condition to be linux-specific,
//             use an object's monitor and to be shared.
//---------------------------------------------------------------------

public class ConditionException : Exception {
    this(string text) { super(text); }
}

// clock_gettime definitions copied from commented-out and scattered
// definitions in druntime
private {
    enum int CLOCK_REALTIME = 0;
    enum int TIMER_ABSTIME  = 0x01;

    alias int clockid_t;

    extern (C) int clock_gettime(clockid_t, timespec*);
}


shared class Condition {
    private {
        pthread_cond_t _handle;
        shared Object  _obj;
    }

    // Initialise the Condition, which uses the supplied Object's monitor
    // for its mutex.
    this(shared Object o) {
        assert(o, "Object must not be null");
        _obj = o;
        int rc = pthread_cond_init(cast(pthread_cond_t*)&_handle, null);
        assert(!rc, "Unable to create condition");
    }

    ~this() {
        pthread_cond_destroy(cast(pthread_cond_t*)&_handle);
    }

    // Wait until someone calls notify() or notifyAll()
    // Note that signals do not interrupt a wait.
    void wait() {
        int rc = pthread_cond_wait(cast(pthread_cond_t*)&_handle, mutexPtr);
        if (rc == 0)      return;
        if (rc == EINVAL) throw new ConditionException("Invalid parameters");
        if (rc == EPERM)  throw new ConditionException("Mutex not owned by calling thread");
        throw new ConditionException("Unknown error=" ~ to!string(rc));
    }

    // Wait with a timeout in microseconds, returning false on timeout, and true otherwise
    // Note that signals do not interrupt a wait.
    bool wait(long microseconds) {
        enum long NANOS_PER_SEC   = 1_000_000_000;
        enum long NANOS_PER_MICRO = 1_000;

        timespec t = void;
        clock_gettime(CLOCK_REALTIME, &t);
        long nanos = t.tv_sec * NANOS_PER_SEC + t.tv_nsec + microseconds * NANOS_PER_MICRO;
        t.tv_sec   = cast(time_t) (nanos / NANOS_PER_SEC);
        t.tv_nsec  = cast(c_long) (nanos % NANOS_PER_SEC);

        int rc = pthread_cond_timedwait(cast(pthread_cond_t*)&_handle, mutexPtr, &t);
        if (rc == 0)         return true;
        if (rc == ETIMEDOUT) return false;
        if (rc == EINVAL)    throw new ConditionException("Invalid parameters");
        if (rc == EPERM)     throw new ConditionException("Mutex not owned by calling thread");
        throw new ConditionException("Unknown error=" ~ to!string(rc));
    }

    // Notify one waiter
    void notify() {
        pthread_cond_signal(cast(pthread_cond_t*)&_handle);
    }

    // Notify all waiters
    void notifyAll() {
        pthread_cond_broadcast(cast(pthread_cond_t*)&_handle);
    }

    // Return a pointer to the object's monitor's mutex.
    // Note - can't stash it on construction because the monitor isn't 
    // created until it is needed for a synchronized call.
    private pthread_mutex_t* mutexPtr() {
        struct Monitor { // from monitor.c
            void*           impl;
            size_t          length;
            void*           ptr;
            size_t          refs; // added in dmd 2.048
            pthread_mutex_t mon;
        }
        Monitor * monitor = cast(Monitor*) (cast(void**)_obj)[1]; // from object_.d
        return cast(pthread_mutex_t*)&monitor.mon;
    }
}


//-----------------------------------------------------------------------------------
// A Pipe for transferring raw data between threads. Usually it is better to use
// a Channel with a defined protocol (see below), however Pipes can be useful
// for tests and simulations where you want to emulate (say) a socket or serial port.
//-----------------------------------------------------------------------------------

class PipeReadable : ISelectableReadable {
    private int _fd;

    this(int fd) {
        _fd = linux.dup(fd);
    }

    ~this() {
        linux.close(_fd);
    }

    override uint read(void* buffer, uint count) {
        return fdRead(_fd, buffer, count);
    }

    override protected int read_descriptor() {
        return _fd;
    }
}

class PipeWriteable : ISelectableWriteable {
    private int _fd;

    this(int fd) {
        _fd = fd;
    }

    ~this() {
        linux.close(_fd);
    }

    override uint write(in void* buffer, uint count) {
        return fdWrite(_fd, buffer, count);
    }

    override protected int write_descriptor() {
        return _fd;
    }
}

synchronized class Pipe {
    private {
        int readFd, writeFd;
    }

    this() {
        int[2] fds;
        int rc = linux.pipe(fds);
        assert(!rc);
        readFd = fds[0];
        writeFd = fds[1];
    }

    ~this() {
        if (readFd  != -1) linux.close(readFd);
        if (writeFd != -1) linux.close(writeFd);
    }

    // one-shot return of a new reader
    PipeReadable readable() {
        assert(readFd != -1);
        auto r = new PipeReadable(readFd);
        readFd = -1;
        return r;
    }

    // one-shot return of a new writer
    PipeWriteable writeable() {
        assert(writeFd != -1);
        auto w = new PipeWriteable(writeFd);
        writeFd = -1;
        return w;
    }
}



//---------------------------------------------------------------------------
// A Channel for sending messages between threads.
// An internal eventfd provides a file-descriptor to select on for removal.
// Multiple threads may add and remove messages from the queue, but usually
// one thread adds and one other thread removes.
//---------------------------------------------------------------------------


public class ChannelFull : Exception {
    this() { super("channel full"); }
}

public class ChannelFinalized: Exception {
    this() { super("channel finalized"); }
}


// An ISelectableReader returned from a Channel for use in a Selector
// The Channel itself cannot be used because it is shared.
class ChannelSelectable : ISelectableReadable {
    private int _fd;

    this(int fd) {
        _fd = fd;
    }

    // ISelectableReadable implementation:
    override uint read(void * buffer, uint count) {
        assert(false, "read is not supported");
    }
    override protected int read_descriptor() {
        return _fd;
    }
}

extern (C) int eventfd(int initval, int flags);

synchronized class Channel(T) if (!hasAliasing!T) {
    private {

        struct Node {
            T     payload;
            Node* next;

            this(T payload_) {
                payload = payload_;
            }
        }

        Node*     _front;
        Node*     _back;
        uint      _capacity;
        uint      _count;
        int       _fd;
        bool      _finalized;
        Condition _readCondition;
        Condition _writeCondition;
    }

    this(uint capacity = 0) {
        assert(capacity >= 0, "Capacity must not be negative");
        _capacity = capacity;

        _fd = eventfd(0, 0);
        assert(_fd != -1, "Failed to open eventfd");

        _readCondition  = new shared(Condition)(this);
        _writeCondition = new shared(Condition)(this);
    }

    ~this() {
        linux.close(_fd);
    }

    // Return a new Selectable that can be used in a Selector to discover
    // when the Channel is not empty.
    ChannelSelectable newSelectable() {
        return new ChannelSelectable(_fd);
    }

    // Finalise the channel, causing it to throw ChannelFinalised on remove() when empty.
    final void finalize() {
        _finalized = true;
        if (!_count) {
            ulong one = 1;
            linux.write(_fd, &one, one.sizeof);
            _readCondition.notifyAll;
        }
    }

    // Add a message to the back of the channel, throwing if full
    final void add(T msg) {
        if (_capacity && _count == _capacity) {
            throw new ChannelFull();
        }
        auto n = cast(shared) new Node(msg);
        n.next = null;
        if (!_front) _front = n;
        if (!_back)  _back = n;
        else         _back.next = n;
        _back = n;
        if (!_count) {
            ulong one = 1;
            linux.write(_fd, &one, one.sizeof);
            _readCondition.notifyAll;
        }
        ++_count;
    }

    // Blocking version of add
    final void addBlocking(T msg) {
        while (_capacity && _count == _capacity) {
            _writeCondition.wait;
        }
        add(msg);
    }

    // Remove and return the front message, blocking until one is available
    // or throwing if finalized and empty
    T remove() {
        while (!_count) {
            if (_finalized) {
                //log_trace("throwing");
                throw new ChannelFinalized();
            }
            else {
                //log_trace("waiting");
                _readCondition.wait;
            }
        }
        if (_capacity && _count == _capacity) {
            _writeCondition.notifyAll;
        }
        T msg = _front.payload;
        assert(_front);
        _front = _front.next;
        if (!_front) _back = null;
        --_count;
        if (!_count && !_finalized) {
            ulong val = void;
            linux.read(_fd, &val, val.sizeof);
        }
        return msg;
    }
}


//
// Spawn and start a thread on the given run function.
// FIXME - insist on arguments being shared or !hasAliasing.
//
void spawn(T...)(void function(T) run, string name, T args) {
    void exec() {
        myName = name; // myName is defined in logging
        try {
            run(args);
        }
        catch (Throwable ex) {
            log_error("Died with: %s", ex);
            exit(1);
        }
    }
    auto t = new Thread(&exec);
    t.start();
}


//=========== Code-generating templates ================

//
// Helper template to stamp out recursive string-defining templates
//
private template Paste(string name, string joiner) {
    enum string Paste =
        "\nstatic if (more) {" ~
        "\n    enum string " ~ name ~ " = _" ~ name ~ " ~ \"" ~ joiner ~ "\" ~ Next." ~ name ~ ";" ~
        "\n}" ~
        "\nelse {" ~
        "\n    enum string " ~ name ~ " = _" ~ name ~ ";" ~
        "\n}";
}


//-------------------------------------------------------------------------------------
// Message - code-generating template that defines a streamable struct
//           and a bunch of strings used by higher-level templates.
//
// Expects a message name followed by pairs of parameter type and parameter name.
// The parameter types must not have aliasing - ie cannot refer to non-immutable data.
//-------------------------------------------------------------------------------------

// generate strings used for Message code
private template MessageStrings(string msgName, T...) {
    static assert(T.length > 1, "Message parameters must be in pairs");
    enum more = T.length > 2;
    static if (more) {
        alias MessageStrings!(msgName, T[2..$]) Next;
    }
    static if (T.length > 0) {
        static assert(!hasAliasing!(T[0]), "Cannot use type " ~ T[0].stringof ~ " in a message");
        static assert(is(typeof(T[1]) : string), "Message parameters must be named");

        enum string _fieldStr = T[0].stringof ~ " " ~ T[1] ~ ";";
        enum string _initStr  = "this." ~ T[1] ~ " = " ~ T[1] ~ ";";
        enum string _readStr  = T[1] ~ " = stream.get!" ~ T[0].stringof ~ ";";
        enum string _writeStr = "(" ~ T[1] ~ ")";

        enum string _paramStr = T[0].stringof ~ " " ~ T[1];
        enum string _nameStr  = T[1];
        enum string _callStr  = "message." ~ msgName ~ "." ~ T[1];

        mixin(Paste!("fieldStr", "\n    "));     // field types and names in definition
        mixin(Paste!("initStr",  "\n        ")); // field assignments in constructor
        mixin(Paste!("readStr",  "\n        ")); // fields read from an InStream
        mixin(Paste!("writeStr", ""));           // fields written to an OutStream

        mixin(Paste!("paramStr", ", "));         // field types and names, comma-separated
        mixin(Paste!("nameStr",  ", "));         // field names, comma separated
        mixin(Paste!("callStr",  ", "));         // field values accessed in an enclosing union, comma separated
    }
}

template Message(string name, T...) {
    static assert(T.length > 0, "Messages have to contain fields");

    alias name msgName;

    alias MessageStrings!(name, T) strings;

    enum string code =
        "\n    struct " ~ name ~ "Msg {" ~
        "\n        " ~ strings.fieldStr ~
        "\n        this(" ~ strings.paramStr ~ ") {" ~
        "\n             " ~ strings.initStr ~
        "\n        }" ~
        "\n        void read(InStream stream) {" ~
        "\n             " ~ strings.readStr ~
        "\n        }" ~
        "\n        void write(OutStream stream) {" ~
        "\n            stream" ~ strings.writeStr ~ ";" ~
        "\n        }" ~
        "\n    }";
}


//----------------------------------------------------------------------------------
// Protocol - defines, for the given Messages:
// * a discriminated union to hold any of them,
// * a function that returns a channel to send them through,
// * functions to send the messages,
// * an interface to process dispatched messages with, and
// * a function to receive and dispatch incoming messages.
//--------------------------------------------------------------------------------


//
// Helper to return strings for use in mixins by Protocol template.
// All the strings roll out to include an entry for each Message in T...
//
private template ProtocolStrings(uint index, T...) {
    static assert(is(typeof(T[0].msgName) : string), "Protocols must comprise messages");
    static assert(is(typeof(T[0].code)    : string), "Trivial messages not supported yet");

    enum more = T.length > 1;
    static if (more) {
        alias ProtocolStrings!(index+1, T[1..$]) Next;
    }

    enum string _msgStr = T[0].code;

    enum string _unionStr = "\n        " ~ T[0].msgName ~ "Msg " ~ T[0].msgName ~ ";";

    enum string _thisStr =
        "\n        this(ref " ~ T[0].msgName ~ "Msg msg) {" ~
        "\n            kind = " ~ to!string(index) ~ ";" ~
        "\n            " ~ T[0].msgName ~ " = msg;" ~
        "\n        }";

    enum string _readStr = 
        "\n            case " ~ to!string(index) ~ ": " ~ T[0].msgName ~ ".read(stream); break;";

    enum string _writeStr = 
        "\n            case " ~ to!string(index) ~ ": " ~ T[0].msgName ~ ".write(stream); break;";

    enum string _interfaceStr = "\n        void " ~ T[0].msgName ~ "(" ~ T[0].strings.paramStr ~ ");";

    enum string _sendStr =
        "\n    void " ~ T[0].msgName ~ "(" ~ T[0].strings.paramStr ~ ") {" ~
        "\n        channel.add(Message(" ~ T[0].msgName ~ "Msg(" ~ T[0].strings.nameStr ~ ")));" ~
        "\n    }";

    enum string _caseStr =
        "\n            case " ~ to!string(index) ~ ":" ~
        " handler." ~ T[0].msgName ~ "(" ~ T[0].strings.callStr ~ ");" ~
        " break;";

    mixin(Paste!("msgStr",       ""));
    mixin(Paste!("unionStr",     ""));
    mixin(Paste!("thisStr",      ""));
    mixin(Paste!("readStr",      ""));
    mixin(Paste!("writeStr",     ""));
    mixin(Paste!("interfaceStr", ""));
    mixin(Paste!("sendStr",      ""));
    mixin(Paste!("caseStr",      ""));
}


//
// Protocol - expects Messages as its template parameters.
// The strings used in mixins are available for inspection should you need them.
//
template Protocol(string name, T...) {

    alias ProtocolStrings!(0, T) strings;

    alias strings.msgStr msgStr;

    // Define the discriminated union to hold any message
    enum string unionStr =
        "\n    struct Message {" ~
        "\n        uint kind;" ~
        "\n        union { " ~
        strings.unionStr ~
        "\n        }" ~
        strings.thisStr ~
        "\n        this(InStream stream) {" ~
        "\n            kind = stream.get!uint;" ~
        "\n            switch(kind) {" ~
        strings.readStr ~
        "\n                default: assert(0, \"Cannot read unsupported message kind\");" ~
        "\n            }" ~
        "\n        }" ~
        "\n        void write(OutStream stream) {" ~
        "\n            stream(kind);" ~
        "\n            switch(kind) {" ~
        strings.writeStr ~
        "\n                default: assert(0, \"Cannot write unsupported message kind\");" ~
        "\n            }" ~
        "\n        }" ~
        "\n    }";

    // Define the channel
    enum string channelStr =
        "\n    private alias Channel!(Message) _Chan;" ~
        "\n    private alias shared _Chan Chan;" ~
        "\n" ~
        "\n    private Chan channel;" ~
        "\n    this() { channel = new Chan(); }" ~
        "\n    ChannelSelectable newSelectable() { return channel.newSelectable(); }" ~
        "\n    void finalize() { channel.finalize; }" ~
        "\n";

    // Define the interface type: interface { void msg1_name(msg1_params); ... }
    enum string interfaceStr = "\n    interface IHandler { " ~ strings.interfaceStr ~ "\n    }";

    // Define the send functions
    alias strings.sendStr sendStr;

    // Define the receive function
    enum string receiveStr =
        "\n    void receive(IHandler handler) {" ~
        "\n        auto message = channel.remove;" ~
        "\n        switch (message.kind) {" ~
        strings.caseStr ~
        "\n            default: assert(0, \"Cannot dispatch unsupported message kind\");" ~
        "\n        }" ~
        "\n    }";

    enum string code =
        "\nclass " ~ name ~ " {" ~
        msgStr ~ unionStr ~ channelStr ~ interfaceStr ~ sendStr ~ receiveStr ~
        "\n}";
}
import bedrock.maxim.io;
import bedrock.maxim.concurrency;
import bedrock.maxim.logging;

import std.stdio;
import std.string;
import std.conv;


alias Protocol!("Requests",    Message!("job",    string, "name")).code jobCode;
alias Protocol!("Results",     Message!("result", string, "job", string, "desc")).code resultCode;
alias Protocol!("Completions", Message!("done",   string, "name")).code doneCode;

mixin(jobCode);
mixin(resultCode);
mixin(doneCode);


//--------------------------------------
// planner
//--------------------------------------

class ResultHandler : Results.IHandler {
    private bool[string] mOutstanding;

    void expect(string job) {
        mOutstanding[job] = true;
    }

    bool allDone() {
        return mOutstanding.length == 0;
    }

    override void result(string job, string desc) {
        log_info("got result %s: %s", job, desc);
        mOutstanding.remove(job);
    }
}

void doPlanning(Requests requests, Results results, Completions completions) {
    auto handler = new ResultHandler();

    foreach (i; 0..10) {
        string job = "job " ~ to!string(i);
        log_info("sending %s", job);
        requests.job(job);
        handler.expect(job);
    }
    while (!handler.allDone) {
        results.receive(handler);
    }
    log_info("all jobs done - terminating");
    requests.finalize;
    completions.done(myName);
}


//------------------------------------
// worker
//------------------------------------

class JobHandler : Requests.IHandler {
    private Results results;

    this(Results results_) {
        results = results_;
    }

    override void job(string job) {
        log_info("got job %s", job);
        results.result(job, job ~ " result");
    }
}

void doWork(Requests requests, Results results, Completions completions) {
    auto handler = new JobHandler(results);

    try {
        for (;;) {
            requests.receive(handler);
        }
    }
    catch (ChannelFinalized) {}
    log_info("%s: channel finalized - terminating", myName);
    completions.done(myName);
}


//------------------------------------
// main
//------------------------------------

class MainHandler : Completions.IHandler {
    private uint mRemaining;

    this(uint remaining) {
        mRemaining = remaining;
    }

    bool allDone() {
        return mRemaining == 0;
    }

    override void done(string name) {
        --mRemaining;
        log_info("%s: %s is done - %s remaining", myName, name, mRemaining);
    }
}

void main() {
    myName = "main";

    writefln("\n%s\n", jobCode);

    auto requests    = new Requests();
    auto results     = new Results();
    auto completions = new Completions();

    uint qty;

    spawn(&doPlanning, "planner", requests, results, completions);
    ++qty;
    foreach (i; 0..2) {
        spawn(&doWork, format("worker%s", i), requests, results, completions);
        ++qty;
    }

    auto handler = new MainHandler(qty);
    while (!handler.allDone) {
        completions.receive(handler);
    }
    log_info("%s: all child threads terminated - terminating", myName);
}

Reply via email to