Graham St Jack wrote:

> On Mon, 07 Dec 2009 22:45:17 -0500, Jason House wrote:
>> I've backed out most of my pro-shared changes and will try again in a
>> few months :(
> 
> I have also given up on shared and am also adopting a waiting strategy.

Yeah, it's sad.  I have been successful in converting part of my code base 
to use shared, and suspect I can probably get another chunk working if I try 
really hard.  I partially suspect that the viral nature of shared makes any 
conversion of a code base tough.  It may be much easier to get something 
working if shared is used from the beginning while developing.


> I would love to get some tips from anyone (like Walter, for example) who
> thinks they have a way of using shared successfully.

Here's the one module that I was able to completely convert to use shared 
(and converted all uses of it to use shared).  Maybe it'd help you figure 
out how to get things to work?  It's a relatively brain dead message queue.  
It can't hold more than one message at a time (adding a second message will 
block until the first message has been received by all threads).  It's 
intended for one master thread to send a message to all recipients in a 
thread group.

module hb.io.ipc;

import std.cstream;
import core.thread;
import tango.core.Atomic;

template broadcastMessageQueue(target, double sleepSec=0.1){
        private enum int sleepTicks = cast(int) (sleepSec*100_000_000);
        /// Broadcasts a delegate to a bunch of identical recipients
        class sender{
                alias void delegate(target) messageType;
                private int id;
                private int max;
                private int pending;
                private messageType msg;
                this(int numberOfRecipients){ max = numberOfRecipients; }
                /// Only blocks if queue is full
                void send(shared messageType message) shared{
                        waitForQueueToEmpty;
                        id++;
                        msg = message;
                        pending = max;
                }
                /// Blocks until every recipient got the message
                void push(shared messageType message) shared{
                        send(message);
                        waitForQueueToEmpty;
                }
                private bool receive(int messageId, target t) shared{
                        if (pending == 0 || id < messageId)
                                return false;
                        msg(t);
                        atomicDecrement!(msync.raw)(pending);
                        return true;
                }
                private void waitForQueueToEmpty() shared{
                        while(pending > 0)
                                Thread.sleep(sleepTicks);
                }
        }
        
        /// Receives delegates from the specified sender. Never blocks.
        class receiver{
                private target parent;
                private shared sender source;
                private int nextMessageId = 1;
                this(target t, shared sender s){ parent = t; source = s; }
                bool receive(){
                        // Cast is hack to circumvent bugzilla issue #3089
                        if (source.receive(nextMessageId, parent)){
                                nextMessageId++;
                                return true;
                        }
                        return false;
                }
        }
}

version(test)
unittest{
        derr.writefln("Testing broadcast message queue");
        class dummy{ int x; }
        auto foo = new dummy;
        auto bar = new dummy;
        // Extra parenthesis as hack to circumvent dmd bugzilla issue #3091 
        auto sender = new shared(broadcastMessageQueue!(dummy).sender)(2);
        auto rx1 = new broadcastMessageQueue!(dummy).receiver(foo, sender);
        auto rx2 = new broadcastMessageQueue!(dummy).receiver(bar, sender);

        assert(rx1.receive == false);
        assert(rx2.receive == false);

        sender.send( cast(shared void delegate(dummy)) (dummy d){d.x++;});

        assert(rx1.receive);
        assert(rx2.receive);

        assert(rx1.receive == false);
        assert(rx2.receive == false);


        assert(foo.x == 1);
        assert(bar.x == 1);
}

Reply via email to