PROTON-1047: go: improved ack handling in electron API, add to broker example

Sender interface changed
- SendAsync: flexible and efficient server-side ack handling across multiple 
Senders.
- SendSync, SendForget, SendWaitable: easy-to-use methods for common client 
cases.

electron broker.go, send.go examples demonstrate async send handling.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd8ad96f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd8ad96f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd8ad96f

Branch: refs/heads/go1
Commit: cd8ad96f2b3022344198abe8b76f0798758dbe9d
Parents: 41eac74
Author: Alan Conway <[email protected]>
Authored: Wed Nov 11 15:03:07 2015 -0500
Committer: Alan Conway <[email protected]>
Committed: Wed Nov 18 14:27:57 2015 -0500

----------------------------------------------------------------------
 examples/go/electron/broker.go                  | 133 +++++--
 examples/go/electron/receive.go                 |  11 +-
 examples/go/electron/send.go                    |  25 +-
 examples/go/proton/broker.go                    |   9 +-
 examples/go/util/queue.go                       |  10 +
 .../go/src/qpid.apache.org/amqp/error.go        |   2 +-
 .../src/qpid.apache.org/electron/connection.go  |  27 +-
 .../go/src/qpid.apache.org/electron/doc.go      |   3 +-
 .../go/src/qpid.apache.org/electron/endpoint.go |  31 +-
 .../go/src/qpid.apache.org/electron/handler.go  |  16 +-
 .../go/src/qpid.apache.org/electron/link.go     |  23 +-
 .../qpid.apache.org/electron/messaging_test.go  |  71 ++--
 .../go/src/qpid.apache.org/electron/receiver.go |  29 +-
 .../go/src/qpid.apache.org/electron/sender.go   | 361 +++++++++----------
 .../go/src/qpid.apache.org/electron/session.go  |  21 +-
 .../go/src/qpid.apache.org/electron/time.go     |   3 +-
 .../go/src/qpid.apache.org/proton/engine.go     |   7 +
 17 files changed, 430 insertions(+), 352 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index f68deb1..66941b7 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -33,6 +33,7 @@ import (
        "log"
        "net"
        "os"
+       "qpid.apache.org/amqp"
        "qpid.apache.org/electron"
 )
 
@@ -52,7 +53,12 @@ var qsize = flag.Int("qsize", 1000, "Max queue size")
 func main() {
        flag.Usage = usage
        flag.Parse()
-       b := &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
+       b := &broker{
+               queues:    util.MakeQueues(*qsize),
+               container: electron.NewContainer(""),
+               acks:      make(chan electron.Outcome),
+               sent:      make(chan sentMessage),
+       }
        if err := b.run(); err != nil {
                log.Fatal(err)
        }
@@ -60,18 +66,31 @@ func main() {
 
 // State for the broker
 type broker struct {
-       queues    util.Queues
-       container electron.Container
+       queues    util.Queues           // A collection of queues.
+       container electron.Container    // electron.Container manages AMQP 
connections.
+       sent      chan sentMessage      // Channel to record sent messages.
+       acks      chan electron.Outcome // Channel to receive the Outcome of 
sent messages.
+}
+
+// Record of a sent message and the queue it came from.
+// If a message is rejected or not acknowledged due to a failure, we will put 
it back on the queue.
+type sentMessage struct {
+       m amqp.Message
+       q util.Queue
 }
 
-// Listens for connections and starts an electron.Connection for each one.
+// run listens for incoming net.Conn connections and starts an 
electron.Connection for each one.
 func (b *broker) run() error {
        listener, err := net.Listen("tcp", *addr)
        if err != nil {
                return err
        }
        defer listener.Close()
-       fmt.Printf("Listening on %s\n", listener.Addr())
+       fmt.Printf("Listening on %v\n", listener.Addr())
+
+       go b.acknowledgements() // Handles acknowledgements for all connections.
+
+       // Start a goroutine for each new connections
        for {
                conn, err := listener.Accept()
                if err != nil {
@@ -83,69 +102,107 @@ func (b *broker) run() error {
                        util.Debugf("Connection error: %v", err)
                        continue
                }
-               go b.accept(c) // Goroutine to accept incoming sessions and 
links.
+               cc := &connection{b, c}
+               go cc.run() // Handle the connection
                util.Debugf("Accepted %v", c)
        }
 }
 
-// accept remotely-opened endpoints (Session, Sender and Receiver)
+// State for a broker connectoin
+type connection struct {
+       broker     *broker
+       connection electron.Connection
+}
+
+// accept remotely-opened endpoints (Session, Sender and Receiver) on a 
connection
 // and start goroutines to service them.
-func (b *broker) accept(c electron.Connection) {
-       for in := range c.Incoming() {
+func (c *connection) run() {
+       for in := range c.connection.Incoming() {
                switch in := in.(type) {
 
                case *electron.IncomingSender:
                        if in.Source() == "" {
-                               util.Debugf("sender has no source: %s", in)
-                               break
+                               in.Reject(fmt.Errorf("no source"))
+                       } else {
+                               go c.sender(in.Accept().(electron.Sender))
                        }
-                       go b.sender(in.Accept().(electron.Sender))
 
                case *electron.IncomingReceiver:
                        if in.Target() == "" {
-                               util.Debugf("receiver has no target: %s", in)
-                               break
+                               in.Reject(fmt.Errorf("no target"))
+                       } else {
+                               in.SetPrefetch(true)
+                               in.SetCapacity(*credit) // Pre-fetch up to 
credit window.
+                               go c.receiver(in.Accept().(electron.Receiver))
                        }
-                       in.SetPrefetch(true)
-                       in.SetCapacity(*credit) // Pre-fetch up to credit 
window.
-                       go b.receiver(in.Accept().(electron.Receiver))
 
                default:
                        in.Accept() // Accept sessions unconditionally
                }
        }
-       util.Debugf("incoming closed: %s", c)
+       util.Debugf("incoming closed: %v", c.connection)
+}
+
+// receiver receives messages and pushes to a queue.
+func (c *connection) receiver(receiver electron.Receiver) {
+       q := c.broker.queues.Get(receiver.Target())
+       for {
+               if rm, err := receiver.Receive(); err == nil {
+                       util.Debugf("%v: received %v", receiver, 
util.FormatMessage(rm.Message))
+                       q <- rm.Message
+                       rm.Accept()
+               } else {
+                       util.Debugf("%v error: %v", receiver, err)
+                       break
+               }
+       }
 }
 
 // sender pops messages from a queue and sends them.
-func (b *broker) sender(sender electron.Sender) {
-       q := b.queues.Get(sender.Source())
+func (c *connection) sender(sender electron.Sender) {
+       q := c.broker.queues.Get(sender.Source())
        for {
-               m, ok := <-q
-               if !ok { // Queue closed
+               if sender.Error() != nil {
+                       util.Debugf("%v closed: %v", sender, sender.Error())
                        return
                }
-               if err := sender.SendForget(m); err == nil {
-                       util.Debugf("%s send: %s", sender, 
util.FormatMessage(m))
-               } else {
-                       util.Debugf("%s error: %s", sender, err)
-                       q <- m // Put it back on the queue.
-                       return
+               select {
+
+               case m := <-q:
+                       util.Debugf("%v: sent %v", sender, 
util.FormatMessage(m))
+                       sm := sentMessage{m, q}
+                       c.broker.sent <- sm                    // Record sent 
message
+                       sender.SendAsync(m, c.broker.acks, sm) // Receive 
outcome on c.broker.acks with Value sm
+
+               case <-sender.Done(): // break if sender is closed
+                       break
                }
        }
 }
 
-// receiver receives messages and pushes to a queue.
-func (b *broker) receiver(receiver electron.Receiver) {
-       q := b.queues.Get(receiver.Target())
+// acknowledgements keeps track of sent messages and receives outcomes.
+//
+// We could have handled outcomes separately per-connection, per-sender or even
+// per-message. Message outcomes are returned via channels defined by the user
+// so they can be grouped in any way that suits the application.
+func (b *broker) acknowledgements() {
+       sentMap := make(map[sentMessage]bool)
        for {
-               if rm, err := receiver.Receive(); err == nil {
-                       util.Debugf("%s: received %s", receiver, 
util.FormatMessage(rm.Message))
-                       q <- rm.Message
-                       rm.Accept()
-               } else {
-                       util.Debugf("%s error: %s", receiver, err)
-                       break
+               select {
+               case sm, ok := <-b.sent: // A local sender records that it has 
sent a message.
+                       if ok {
+                               sentMap[sm] = true
+                       } else {
+                               return // Closed
+                       }
+               case outcome := <-b.acks: // The message outcome is available
+                       sm := outcome.Value.(sentMessage)
+                       delete(sentMap, sm)
+                       if outcome.Status != electron.Accepted { // Error, 
release or rejection
+                               sm.q.PutBack(sm.m) // Put the message back on 
the queue.
+                               util.Debugf("message %v put back, status %v, 
error %v",
+                                       util.FormatMessage(sm.m), 
outcome.Status, outcome.Error)
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index f7d41fa..d631726 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -85,15 +85,12 @@ func main() {
 
                        // Loop receiving messages and sending them to the 
main() goroutine
                        for {
-                               rm, err := r.Receive()
-                               switch err {
-                               case electron.Closed:
-                                       util.Debugf("closed %s", urlStr)
+                               if rm, err := r.Receive(); err != nil {
+                                       util.Debugf("closed %v: %v", urlStr, 
err)
                                        return
-                               case nil:
+                               } else {
+                                       rm.Accept()
                                        messages <- rm.Message
-                               default:
-                                       log.Fatal(err)
                                }
                        }
                }(urlStr)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index c9bdbc9..4c9a0bf 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -42,11 +42,6 @@ Send messages to each URL concurrently with body 
"<url-path>-<n>" where n is the
 
 var count = flag.Int64("count", 1, "Send this may messages per address.")
 
-type sent struct {
-       name        string
-       sentMessage electron.SentMessage
-}
-
 func main() {
        flag.Usage = usage
        flag.Parse()
@@ -58,9 +53,10 @@ func main() {
                os.Exit(1)
        }
 
-       sentChan := make(chan sent) // Channel to receive all the delivery 
receipts.
-       var wait sync.WaitGroup     // Used by main() to wait for all 
goroutines to end.
-       wait.Add(len(urls))         // Wait for one goroutine per URL.
+       sentChan := make(chan electron.Outcome) // Channel to receive 
acknowledgements.
+
+       var wait sync.WaitGroup
+       wait.Add(len(urls)) // Wait for one goroutine per URL.
 
        _, prog := path.Split(os.Args[0])
        container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, 
os.Getpid()))
@@ -91,9 +87,7 @@ func main() {
                                m := amqp.NewMessage()
                                body := fmt.Sprintf("%v-%v", url.Path, i)
                                m.Marshal(body)
-                               sentMessage, err := s.Send(m)
-                               util.ExitIf(err)
-                               sentChan <- sent{body, sentMessage}
+                               s.SendAsync(m, sentChan, body) // Outcome will 
be sent to sentChan
                        }
                }(urlStr)
        }
@@ -102,12 +96,11 @@ func main() {
        expect := int(*count) * len(urls)
        util.Debugf("Started senders, expect %v acknowledgements\n", expect)
        for i := 0; i < expect; i++ {
-               d := <-sentChan
-               disposition, err := d.sentMessage.Disposition()
-               if err != nil {
-                       util.Debugf("acknowledgement[%v] %v error: %v\n", i, 
d.name, err)
+               out := <-sentChan // Outcome of async sends.
+               if out.Error != nil {
+                       util.Debugf("acknowledgement[%v] %v error: %v\n", i, 
out.Value, out.Error)
                } else {
-                       util.Debugf("acknowledgement[%v]  %v (%v)\n", i, 
d.name, disposition)
+                       util.Debugf("acknowledgement[%v]  %v (%v)\n", i, 
out.Value, out.Status)
                }
        }
        fmt.Printf("Received all %v acknowledgements\n", expect)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
index 3eb5880..6458665 100644
--- a/examples/go/proton/broker.go
+++ b/examples/go/proton/broker.go
@@ -24,6 +24,9 @@ under the License.
 // messages to queues or subscribe to receive messages from them.
 //
 
+// TODO: show how to handle acknowledgedments from receivers and put rejected 
or
+// un-acknowledged messages back on their queues.
+
 package main
 
 import (
@@ -277,7 +280,7 @@ func (s *sender) sendable() {
 // run runs in a separate goroutine. It monitors the queue for messages and 
injects
 // a function to send them when there is credit
 func (s *sender) run() {
-       var q chan amqp.Message // q is nil initially as we have no credit.
+       var q util.Queue // q is nil initially as we have no credit.
        for {
                select {
                case _, ok := <-s.credit:
@@ -293,7 +296,7 @@ func (s *sender) run() {
                        q = nil                      // Assume all credit will 
be used used, will be signaled otherwise.
                        s.h.injecter.Inject(func() { // Inject handler function 
to actually send
                                if s.h.senders[s.l] != s { // The sender has 
been closed by the remote end.
-                                       go func() { q <- m }() // Put the 
message back on the queue but don't block
+                                       q.PutBack(m) // Put the message back on 
the queue but don't block
                                        return
                                }
                                if s.sendOne(m) != nil {
@@ -322,7 +325,7 @@ func (s *sender) sendOne(m amqp.Message) error {
                delivery.Settle() // Pre-settled, unreliable.
                util.Debugf("link %s sent %s", s.l, util.FormatMessage(m))
        } else {
-               go func() { s.q <- m }() // Put the message back on the queue 
but don't block
+               s.q.PutBack(m) // Put the message back on the queue, don't block
        }
        return err
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
index d844c0d..2eaba72 100644
--- a/examples/go/util/queue.go
+++ b/examples/go/util/queue.go
@@ -27,6 +27,16 @@ import (
 // Use a buffered channel as a very simple queue.
 type Queue chan amqp.Message
 
+// Put a message back on the queue, does not block.
+func (q Queue) PutBack(m amqp.Message) {
+       select {
+       case q <- m:
+       default:
+               // Not an efficient implementation but ensures we don't block 
the caller.
+               go func() { q <- m }()
+       }
+}
+
 // Concurrent-safe map of queues.
 type Queues struct {
        queueSize int

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go 
b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
index 4096cdc..349fc41 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
@@ -39,7 +39,7 @@ import (
 type Error struct{ Name, Description string }
 
 // Error implements the Go error interface for AMQP error errors.
-func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, 
c.Description) }
+func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, 
c.Description) }
 
 // Errorf makes a Error with name and formatted description as per fmt.Sprintf
 func Errorf(name, format string, arg ...interface{}) Error {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 3462b92..8a9e6cd 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -69,10 +69,10 @@ type Connection interface {
        // to close it with an error. The specific Incoming types have 
additional
        // methods to configure the endpoint.
        //
-       // Delay in receiving from Incoming() or calling Accept/Reject will 
block
-       // proton. Normally you should have a dedicated goroutine receive from 
this
-       // channel and start a new goroutine to serve each endpoint accepted.  
The
-       // channel is closed when the Connection closes.
+       // Not receiving from Incoming() or not calling Accept/Reject will 
block the
+       // electron event loop. Normally you would have a dedicated goroutine 
receive
+       // from Incoming() and start new goroutines to serve each incoming 
endpoint.
+       // The channel is closed when the Connection closes.
        //
        Incoming() <-chan Incoming
 }
@@ -103,15 +103,13 @@ type connection struct {
        incoming    chan Incoming
        handler     *handler
        engine      *proton.Engine
-       err         proton.ErrorHolder
        eConnection proton.Connection
 
        defaultSession Session
-       done           chan struct{}
 }
 
 func newConnection(conn net.Conn, cont *container, setting 
...ConnectionOption) (*connection, error) {
-       c := &connection{container: cont, conn: conn, done: make(chan struct{})}
+       c := &connection{container: cont, conn: conn}
        c.handler = newHandler(c)
        var err error
        c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
@@ -121,12 +119,20 @@ func newConnection(conn net.Conn, cont *container, 
setting ...ConnectionOption)
        for _, set := range setting {
                set(c)
        }
-       c.str = c.engine.String()
+       c.endpoint = makeEndpoint(c.engine.String())
        c.eConnection = c.engine.Connection()
-       go func() { c.engine.Run(); close(c.done) }()
+       go c.run()
        return c, nil
 }
 
+func (c *connection) run() {
+       c.engine.Run()
+       if c.incoming != nil {
+               close(c.incoming)
+       }
+       c.closed(Closed)
+}
+
 func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
 
 func (c *connection) Disconnect(err error) { c.err.Set(err); 
c.engine.Disconnect(err) }
@@ -134,6 +140,9 @@ func (c *connection) Disconnect(err error) { 
c.err.Set(err); c.engine.Disconnect
 func (c *connection) Session(setting ...SessionOption) (Session, error) {
        var s Session
        err := c.engine.InjectWait(func() error {
+               if c.Error() != nil {
+                       return c.Error()
+               }
                eSession, err := c.engine.Connection().Session()
                if err == nil {
                        eSession.Open()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index eaa6e7a..a484900 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -35,7 +35,8 @@ Send() and Receive() messages.
 You can create an AMQP server connection by calling Connection.Server() and
 Connection.Listen() before calling Connection.Open(). A server connection can
 negotiate protocol security details and can accept incoming links opened from
-the remote end of the connection
+the remote end of the connection.
+
 */
 package electron
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index 057e572..f04b240 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -48,15 +48,31 @@ type Endpoint interface {
 
        // Connection containing the endpoint
        Connection() Connection
+
+       // Done returns a channel that will close when the endpoint closes.
+       // Error() will contain the reason.
+       Done() <-chan struct{}
 }
 
 type endpoint struct {
-       err proton.ErrorHolder
-       str string // Must be set by the value that embeds endpoint.
+       err  proton.ErrorHolder
+       str  string // Must be set by the value that embeds endpoint.
+       done chan struct{}
+}
+
+func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan 
struct{})} }
+
+func (e *endpoint) closed(err error) {
+       e.err.Set(err)
+       e.err.Set(Closed)
+       close(e.done)
 }
 
 func (e *endpoint) String() string { return e.str }
-func (e *endpoint) Error() error   { return e.err.Get() }
+
+func (e *endpoint) Error() error { return e.err.Get() }
+
+func (e *endpoint) Done() <-chan struct{} { return e.done }
 
 // Call in proton goroutine to close an endpoint locally
 // handler will complete the close when remote end closes.
@@ -65,3 +81,12 @@ func localClose(ep proton.Endpoint, err error) {
                proton.CloseError(ep, err)
        }
 }
+
+// Used to indicate that a channel has closed which normally is because the 
endpoint is closed.
+func errorOrClosed(e Endpoint) error {
+       if e.Error() != nil {
+               return e.Error()
+       } else {
+               return Closed
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1b1164c..1586026 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -30,7 +30,7 @@ type handler struct {
        delegator    *proton.MessagingAdapter
        connection   *connection
        links        map[proton.Link]Link
-       sentMessages map[proton.Delivery]*sentMessage
+       sentMessages map[proton.Delivery]sentMessage
        sessions     map[proton.Session]*session
 }
 
@@ -38,7 +38,7 @@ func newHandler(c *connection) *handler {
        h := &handler{
                connection:   c,
                links:        make(map[proton.Link]Link),
-               sentMessages: make(map[proton.Delivery]*sentMessage),
+               sentMessages: make(map[proton.Delivery]sentMessage),
                sessions:     make(map[proton.Session]*session),
        }
        h.delegator = proton.NewMessagingAdapter(h)
@@ -64,8 +64,10 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                }
 
        case proton.MSettled:
-               if sm := h.sentMessages[e.Delivery()]; sm != nil {
-                       sm.settled(nil)
+               if sm, ok := h.sentMessages[e.Delivery()]; ok {
+                       d := e.Delivery().Remote()
+                       sm.ack <- Outcome{sentStatus(d.Type()), 
d.Condition().Error(), sm.value}
+                       delete(h.sentMessages, e.Delivery())
                }
 
        case proton.MSendable:
@@ -123,15 +125,19 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected 
disconnect on %s", h.connection))
 
                err := h.connection.Error()
+
                for l, _ := range h.links {
                        h.linkClosed(l, err)
                }
+               h.links = nil
                for _, s := range h.sessions {
                        s.closed(err)
                }
+               h.sessions = nil
                for _, sm := range h.sentMessages {
-                       sm.settled(err)
+                       sm.ack <- Outcome{Unacknowledged, err, sm.value}
                }
+               h.sentMessages = nil
        }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index cadc2c1..91efa8e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -144,7 +144,6 @@ type link struct {
 
        session *session
        eLink   proton.Link
-       done    chan struct{} // Closed when link is closed
 }
 
 func (l *link) Source() string           { return l.source }
@@ -167,7 +166,6 @@ func localLink(sn *session, isSender bool, setting 
...LinkOption) (link, error)
                isSender: isSender,
                capacity: 1,
                prefetch: false,
-               done:     make(chan struct{}),
        }
        for _, set := range setting {
                set(&l)
@@ -188,8 +186,8 @@ func localLink(sn *session, isSender bool, setting 
...LinkOption) (link, error)
        l.eLink.Target().SetAddress(l.target)
        l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
        l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
-       l.str = l.eLink.String()
        l.eLink.Open()
+       l.endpoint = makeEndpoint(l.eLink.String())
        return l, nil
 }
 
@@ -213,23 +211,18 @@ func makeIncomingLink(sn *session, eLink proton.Link) 
incomingLink {
                        rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
                        capacity:  1,
                        prefetch:  false,
-                       done:      make(chan struct{}),
+                       endpoint:  makeEndpoint(eLink.String()),
                },
        }
-       l.str = eLink.String()
        return l
 }
 
-// Called in proton goroutine on closed or disconnected
-func (l *link) closed(err error) {
-       l.err.Set(err)
-       l.err.Set(Closed) // If no error set, mark as closed.
-       close(l.done)
-}
-
 // Not part of Link interface but use by Sender and Receiver.
 func (l *link) Credit() (credit int, err error) {
        err = l.engine().InjectWait(func() error {
+               if l.Error() != nil {
+                       return l.Error()
+               }
                credit = l.eLink.Credit()
                return nil
        })
@@ -240,7 +233,11 @@ func (l *link) Credit() (credit int, err error) {
 func (l *link) Capacity() int { return l.capacity }
 
 func (l *link) Close(err error) {
-       l.engine().Inject(func() { localClose(l.eLink, err) })
+       l.engine().Inject(func() {
+               if l.Error() == nil {
+                       localClose(l.eLink, err)
+               }
+       })
 }
 
 func (l *link) open() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index 3b5a062..5af57e8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -80,7 +80,6 @@ func closeClientServer(client Session, server Connection) {
 
 // Send a message one way with a client sender and server receiver, verify ack.
 func TestClientSendServerReceive(t *testing.T) {
-       timeout := time.Second * 2
        nLinks := 3
        nMessages := 3
 
@@ -116,15 +115,14 @@ func TestClientSendServerReceive(t *testing.T) {
 
        for i := 0; i < nLinks; i++ {
                for j := 0; j < nMessages; j++ {
-                       var sm SentMessage
-
                        // Client send
+                       ack := make(chan Outcome, 1)
                        sendDone := make(chan struct{})
                        go func() {
                                defer close(sendDone)
                                m := 
amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
                                var err error
-                               sm, err = s[i].Send(m)
+                               s[i].SendAsync(m, ack, "testing")
                                if err != nil {
                                        t.Fatal(err)
                                }
@@ -141,16 +139,19 @@ func TestClientSendServerReceive(t *testing.T) {
 
                        // Should not be acknowledged on client yet
                        <-sendDone
-                       if d, err := sm.DispositionTimeout(0); err != Timeout 
|| NoDisposition != d {
-                               t.Errorf("want [no-disposition/timeout] got 
[%v/%v]", d, err)
+                       select {
+                       case <-ack:
+                               t.Errorf("unexpected ack")
+                       default:
                        }
-                       // Server ack
-                       if err := rm.Acknowledge(Rejected); err != nil {
+
+                       // Server send ack
+                       if err := rm.Reject(); err != nil {
                                t.Error(err)
                        }
                        // Client get ack.
-                       if d, err := sm.DispositionTimeout(timeout); err != nil 
|| Rejected != d {
-                               t.Errorf("want [rejected/nil] got [%v/%v]", d, 
err)
+                       if a := <-ack; a.Value != "testing" || a.Error != nil 
|| a.Status != Rejected {
+                               t.Error("unexpected ack: ", a.Status, a.Error, 
a.Value)
                        }
                }
        }
@@ -166,12 +167,10 @@ func TestClientReceiver(t *testing.T) {
                                s := in.Accept().(Sender)
                                go func() {
                                        for i := int32(0); i < 
int32(nMessages); i++ {
-                                               sm, err := 
s.Send(amqp.NewMessageWith(i))
-                                               if err != nil {
-                                                       t.Error(err)
+                                               out := 
s.SendSync(amqp.NewMessageWith(i))
+                                               if out.Error != nil {
+                                                       t.Error(out.Error)
                                                        return
-                                               } else {
-                                                       sm.Disposition() // 
Sync send.
                                                }
                                        }
                                        s.Close(nil)
@@ -235,10 +234,10 @@ func TestTimeouts(t *testing.T) {
        short := time.Millisecond
        long := time.Second
        m := amqp.NewMessage()
-       if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, 
expect timeout.
+       if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No 
credit, expect timeout.
                t.Error("want Timeout got", err)
        }
-       if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, 
expect timeout.
+       if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No 
credit, expect timeout.
                t.Error("want Timeout got", err)
        }
        // Test receive with timeout
@@ -250,17 +249,15 @@ func TestTimeouts(t *testing.T) {
                t.Error("want Timeout got", err)
        }
        // There is now a credit on the link due to receive
-       sm, err := snd.SendTimeout(m, long)
-       if err != nil {
-               t.Fatal(err)
-       }
+       ack := make(chan Outcome)
+       snd.SendAsyncTimeout(m, ack, nil, short)
        // Disposition should timeout
-       if _, err = sm.DispositionTimeout(long); err != Timeout {
-               t.Error("want Timeout got", err)
-       }
-       if _, err = sm.DispositionTimeout(short); err != Timeout {
-               t.Error("want Timeout got", err)
+       select {
+       case <-ack:
+               t.Errorf("want Timeout got %#v", ack)
+       case <-time.After(short):
        }
+
        // Receive and accept
        rm, err := rcv.ReceiveTimeout(long)
        if err != nil {
@@ -268,9 +265,8 @@ func TestTimeouts(t *testing.T) {
        }
        rm.Accept()
        // Sender get ack
-       d, err := sm.DispositionTimeout(long)
-       if err != nil || d != Accepted {
-               t.Errorf("want (rejected, nil) got (%v, %v)", d, err)
+       if a := <-ack; a.Status != Accepted || a.Error != nil {
+               t.Errorf("want (accepted, nil) got %#v", a)
        }
 }
 
@@ -329,7 +325,7 @@ type result struct {
 func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) 
}
 
 func doSend(snd Sender, results chan result) {
-       _, err := snd.Send(amqp.NewMessage())
+       err := snd.SendSync(amqp.NewMessage()).Error
        results <- result{"send", err}
 }
 
@@ -338,9 +334,8 @@ func doReceive(rcv Receiver, results chan result) {
        results <- result{"receive", err}
 }
 
-func doDisposition(sm SentMessage, results chan result) {
-       _, err := sm.Disposition()
-       results <- result{"disposition", err}
+func doDisposition(ack <-chan Outcome, results chan result) {
+       results <- result{"disposition", (<-ack).Error}
 }
 
 // Test that closing Links interrupts blocked link functions.
@@ -391,9 +386,8 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
        // Connection.Close() interrupts Send, Receive, Disposition.
        snd, rcv := pairs.senderReceiver()
        go doReceive(rcv, results)
-       sm, err := snd.Send(amqp.NewMessage())
-       fatalIf(t, err)
-       go doDisposition(sm, results)
+       ack := snd.SendWaitable(amqp.NewMessage())
+       go doDisposition(ack, results)
        snd, rcv = pairs.senderReceiver()
        go doSend(snd, results)
        rcv, snd = pairs.receiverSender()
@@ -416,9 +410,8 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
        // Connection.Close() interrupts Send, Receive, Disposition.
        snd, rcv := pairs.senderReceiver()
        go doReceive(rcv, results)
-       sm, err := snd.Send(amqp.NewMessage())
-       fatalIf(t, err)
-       go doDisposition(sm, results)
+       ack := snd.SendWaitable(amqp.NewMessage())
+       go doDisposition(ack, results)
        snd, rcv = pairs.senderReceiver()
        go doSend(snd, results)
        rcv, snd = pairs.receiverSender()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 4ff83b4..22bdc7e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -77,6 +77,9 @@ type prefetchPolicy struct{}
 
 func (p prefetchPolicy) Flow(r *receiver) {
        r.engine().Inject(func() {
+               if r.Error() != nil {
+                       return
+               }
                _, _, max := r.credit()
                if max > 0 {
                        r.eLink.Flow(max)
@@ -94,6 +97,9 @@ type noPrefetchPolicy struct{ waiting int }
 
 func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
        r.engine().Inject(func() {
+               if r.Error() != nil {
+                       return
+               }
                len, credit, max := r.credit()
                add := p.waiting - (len + credit)
                if add > max {
@@ -202,20 +208,23 @@ type ReceivedMessage struct {
        receiver  Receiver
 }
 
-// Acknowledge a ReceivedMessage with the given disposition code.
-func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
-       return rm.receiver.(*receiver).engine().InjectWait(func() error {
-               // Settle doesn't return an error but if the receiver is broken 
the settlement won't happen.
-               rm.eDelivery.SettleAs(uint64(disposition))
-               return rm.receiver.Error()
+// Acknowledge a ReceivedMessage with the given delivery status.
+func (rm *ReceivedMessage) acknowledge(status uint64) error {
+       return rm.receiver.(*receiver).engine().Inject(func() {
+               // Deliveries are valid as long as the connection is, unless 
settled.
+               rm.eDelivery.SettleAs(uint64(status))
        })
 }
 
-// Accept is short for Acknowledge(Accpeted)
-func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
+// Accept tells the sender that we take responsibility for processing the 
message.
+func (rm *ReceivedMessage) Accept() error { return 
rm.acknowledge(proton.Accepted) }
+
+// Reject tells the sender we consider the message invalid and unusable.
+func (rm *ReceivedMessage) Reject() error { return 
rm.acknowledge(proton.Rejected) }
 
-// Reject is short for Acknowledge(Rejected)
-func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
+// Release tells the sender we will not process the message but some other
+// receiver might.
+func (rm *ReceivedMessage) Release() error { return 
rm.acknowledge(proton.Released) }
 
 // IncomingReceiver is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a receiver link.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 11f019b..573e9da 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -25,278 +25,237 @@ import "C"
 import (
        "qpid.apache.org/amqp"
        "qpid.apache.org/proton"
-       "reflect"
        "time"
 )
 
 // Sender is a Link that sends messages.
+//
+// The result of sending a message is provided by an Outcome value.
+//
+// A sender can buffer messages up to the credit limit provided by the remote 
receiver.
+// Send* methods will block if the buffer is full until there is space.
+// Send*Timeout methods will give up after the timeout and set Timeout as 
Outcome.Error.
+//
 type Sender interface {
        Link
 
-       // Send a message without waiting for acknowledgement. Returns a 
SentMessage.
-       // use SentMessage.Disposition() to wait for acknowledgement and get the
-       // disposition code.
-       //
-       // If the send buffer is full, send blocks until there is space in the 
buffer.
-       Send(m amqp.Message) (sm SentMessage, err error)
+       // SendSync sends a message and blocks until the message is 
acknowledged by the remote receiver.
+       // Returns an Outcome, which may contain an error if the message could 
not be sent.
+       SendSync(m amqp.Message) Outcome
+
+       // SendWaitable puts a message in the send buffer and returns a channel 
that
+       // you can use to wait for the Outcome of just that message. The 
channel is
+       // buffered so you can receive from it whenever you want without 
blocking anything.
+       SendWaitable(m amqp.Message) <-chan Outcome
+
+       // SendForget buffers a message for sending and returns, with no 
notification of the outcome.
+       SendForget(m amqp.Message)
 
-       // SendTimeout is like send but only waits up to timeout for buffer 
space.
+       // SendAsync puts a message in the send buffer and returns immediately. 
 An
+       // Outcome with Value = value will be sent to the ack channel when the 
remote
+       // receiver has acknowledged the message or if there is an error.
        //
-       // Returns Timeout error if the timeout expires and the message has not 
been sent.
-       SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err 
error)
+       // You can use the same ack channel for many calls to SendAsync(), 
possibly on
+       // many Senders. The channel will receive the outcomes in the order they
+       // become available. The channel should be buffered and/or served by 
dedicated
+       // goroutines to avoid blocking the connection.
+       //
+       // If ack == nil no Outcome is sent.
+       SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
 
-       // Send a message and forget it, there will be no acknowledgement.
-       // If the send buffer is full, send blocks until there is space in the 
buffer.
-       SendForget(m amqp.Message) error
+       SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, 
timeout time.Duration)
 
-       // SendForgetTimeout is like send but only waits up to timeout for 
buffer space.
-       // Returns Timeout error if the timeout expires and the message has not 
been sent.
-       SendForgetTimeout(m amqp.Message, timeout time.Duration) error
+       SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan 
Outcome
 
-       // Credit indicates how many messages the receiving end of the link can 
accept.
-       //
-       // On a Sender credit can be negative, meaning that messages in excess 
of the
-       // receiver's credit limit have been buffered locally till credit is 
available.
-       Credit() (int, error)
-}
+       SendForgetTimeout(m amqp.Message, timeout time.Duration)
 
-type sendMessage struct {
-       m  amqp.Message
-       sm SentMessage
+       SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
 }
 
-type sender struct {
-       link
-       credit chan struct{} // Signal available credit.
+// Outcome provides information about the outcome of sending a message.
+type Outcome struct {
+       // Status of the message: was it sent, how was it acknowledged.
+       Status SentStatus
+       // Error is a local error if Status is Unsent or Unacknowledged, a 
remote error otherwise.
+       Error error
+       // Value provided by the application in SendAsync()
+       Value interface{}
 }
 
-// Disposition indicates the outcome of a settled message delivery.
-type Disposition uint64
+// SentStatus indicates the status of a sent message.
+type SentStatus int
 
 const (
-       // No disposition available: pre-settled, not yet acknowledged or an 
error occurred
-       NoDisposition Disposition = 0
+       // Message was never sent
+       Unsent SentStatus = iota
+       // Message was sent but never acknowledged. It may or may not have been 
received.
+       Unacknowledged
+       // Message was sent pre-settled, no remote outcome is available.
+       Presettled
        // Message was accepted by the receiver
-       Accepted = proton.Accepted
+       Accepted
        // Message was rejected as invalid by the receiver
-       Rejected = proton.Rejected
-       // Message was not processed by the receiver but may be processed by 
some other receiver.
-       Released = proton.Released
+       Rejected
+       // Message was not processed by the receiver but may be valid for a 
different receiver
+       Released
+       // Receiver responded with an unrecognized status.
+       Unknown
 )
 
-// String human readable name for a Disposition.
-func (d Disposition) String() string {
-       switch d {
-       case NoDisposition:
-               return "no-disposition"
+// String human readable name for SentStatus.
+func (s SentStatus) String() string {
+       switch s {
+       case Unsent:
+               return "unsent"
+       case Unacknowledged:
+               return "unacknowledged"
        case Accepted:
                return "accepted"
        case Rejected:
                return "rejected"
        case Released:
                return "released"
-       default:
+       case Unknown:
                return "unknown"
+       default:
+               return "invalid"
        }
 }
 
-func (s *sender) Send(m amqp.Message) (SentMessage, error) {
-       return s.SendTimeout(m, Forever)
-}
-
-func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) 
(SentMessage, error) {
-       var sm SentMessage
-       if s.sndSettle == SndSettled {
-               sm = nil
-       } else {
-               sm = newSentMessage(s.session.connection)
+// Convert proton delivery state code to SentStatus value
+func sentStatus(d uint64) SentStatus {
+       switch d {
+       case proton.Accepted:
+               return Accepted
+       case proton.Rejected:
+               return Rejected
+       case proton.Released, proton.Modified:
+               return Released
+       default:
+               return Unknown
        }
-       return s.sendInternal(sendMessage{m, sm}, timeout)
-}
-
-func (s *sender) SendForget(m amqp.Message) error {
-       return s.SendForgetTimeout(m, Forever)
 }
 
-func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) 
error {
-       snd := sendMessage{m, nil}
-       _, err := s.sendInternal(snd, timeout)
-       return err
+// Sender implementation, held by handler.
+type sender struct {
+       link
+       credit chan struct{} // Signal available credit.
 }
 
-func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) 
(SentMessage, error) {
-       if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for 
credit
-               if err == Closed {
+func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v 
interface{}, t time.Duration) {
+       // wait for credit
+       if _, err := timedReceive(s.credit, t); err != nil {
+               if err == Closed && s.Error != nil {
                        err = s.Error()
-                       assert(err != nil)
                }
-               return nil, err
+               ack <- Outcome{Unsent, err, v}
+               return
        }
-       if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
-               return nil, err
-       }
-       return snd.sm, nil
-}
-
-// Send a message. Handler goroutine
-func (s *sender) doSend(snd sendMessage) {
-       delivery, err := s.eLink.Send(snd.m)
-       switch sm := snd.sm.(type) {
-       case nil:
-               delivery.Settle()
-       case *sentMessage:
-               sm.delivery = delivery
-               if err != nil {
-                       sm.settled(err)
-               } else {
-                       s.handler().sentMessages[delivery] = sm
+       // Send a message in handler goroutine
+       err := s.engine().Inject(func() {
+               if s.Error() != nil {
+                       if ack != nil {
+                               ack <- Outcome{Unsent, s.Error(), v}
+                       }
+                       return
                }
-       default:
-               assert(false, "bad SentMessage type %T", snd.sm)
-       }
-       if s.eLink.Credit() > 0 {
-               s.sendable() // Signal credit.
+               if delivery, err := s.eLink.Send(m); err == nil {
+                       if ack != nil { // We must report an outcome
+                               if s.SndSettle() == SndSettled {
+                                       delivery.Settle() // Pre-settle if 
required
+                                       ack <- Outcome{Presettled, nil, v}
+                               } else {
+                                       s.handler().sentMessages[delivery] = 
sentMessage{ack, v}
+                               }
+                       } else { // ack == nil, can't report outcome
+                               if s.SndSettle() != SndUnsettled { // 
Pre-settle unless we are forced not to.
+                                       delivery.Settle()
+                               }
+                       }
+               } else { // err != nil
+                       if ack != nil {
+                               ack <- Outcome{Unsent, err, v}
+                       }
+               }
+               if s.eLink.Credit() > 0 { // Signal there is still credit
+                       s.sendable()
+               }
+       })
+       if err != nil && ack != nil {
+               ack <- Outcome{Unsent, err, v}
        }
 }
 
-// Signal the sender has credit. Any goroutine.
+// Set credit flag if not already set. Non-blocking, any goroutine
 func (s *sender) sendable() {
        select { // Non-blocking
-       case s.credit <- struct{}{}: // Set the flag if not already set.
+       case s.credit <- struct{}{}:
        default:
        }
 }
 
-func (s *sender) closed(err error) {
-       s.link.closed(err)
-       close(s.credit)
+func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan 
Outcome {
+       out := make(chan Outcome, 1)
+       s.SendAsyncTimeout(m, out, nil, t)
+       return out
 }
 
-func newSender(l link) *sender {
-       s := &sender{link: l, credit: make(chan struct{}, 1)}
-       s.handler().addLink(s.eLink, s)
-       s.link.open()
-       return s
-}
-
-// SentMessage represents a previously sent message. It allows you to wait for 
acknowledgement.
-type SentMessage interface {
-
-       // Disposition blocks till the message is acknowledged and returns the
-       // disposition state.
-       //
-       // NoDisposition with Error() != nil means the Connection was closed 
before
-       // the message was acknowledged.
-       //
-       // NoDisposition with Error() == nil means the message was pre-settled 
or
-       // Forget() was called.
-       Disposition() (Disposition, error)
-
-       // DispositionTimeout is like Disposition but gives up after timeout, 
see Timeout.
-       DispositionTimeout(time.Duration) (Disposition, error)
-
-       // Forget interrupts any call to Disposition on this SentMessage and 
tells the
-       // peer we are no longer interested in the disposition of this message.
-       Forget()
-
-       // Error returns the error that closed the disposition, or nil if there 
was no error.
-       // If the disposition closed because the connection closed, it will 
return Closed.
-       Error() error
-
-       // Value is an optional value you wish to associate with the 
SentMessage. It
-       // can be the message itself or some form of identifier.
-       Value() interface{}
-       SetValue(interface{})
-}
-
-// SentMessageSet is a concurrent-safe set of sent messages that can be checked
-// to get the next completed sent message
-type SentMessageSet struct {
-       cases []reflect.SelectCase
-       sm    []SentMessage
-       done  chan SentMessage
-}
-
-func (s *SentMessageSet) Add(sm SentMessage) {
-       s.sm = append(s.sm, sm)
-       s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, 
Chan: reflect.ValueOf(sm.(*sentMessage).done)})
+func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
+       s.SendAsyncTimeout(m, nil, nil, t)
 }
 
-// Wait waits up to timeout and returns the next SentMessage that has a valid 
dispositionb
-// or an error.
-func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) 
(SentMessage, error) {
-       s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
-       if timeout == 0 {             // Non-blocking
-               s.cases = append(s.cases, reflect.SelectCase{Dir: 
reflect.SelectDefault})
-       } else {
-               s.cases = append(s.cases,
-                       reflect.SelectCase{Dir: reflect.SelectRecv, Chan: 
reflect.ValueOf(After(timeout))})
+func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
+       deadline := time.Now().Add(t)
+       ack := s.SendWaitableTimeout(m, t)
+       t = deadline.Sub(time.Now()) // Adjust for time already spent.
+       if t < 0 {
+               t = 0
        }
-       chosen, _, _ := reflect.Select(s.cases)
-       if chosen > len(s.sm) {
-               return nil, Timeout
+       if out, err := timedReceive(ack, t); err == nil {
+               return out.(Outcome)
        } else {
-               sm := s.sm[chosen]
-               s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
-               return sm, nil
+               if err == Closed && s.Error() != nil {
+                       err = s.Error()
+               }
+               return Outcome{Unacknowledged, err, nil}
        }
 }
 
-// SentMessage implementation
-type sentMessage struct {
-       connection  *connection
-       done        chan struct{}
-       delivery    proton.Delivery
-       disposition Disposition
-       err         error
-       value       interface{}
+func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) {
+       s.SendAsyncTimeout(m, ack, v, Forever)
 }
 
-func newSentMessage(c *connection) *sentMessage {
-       return &sentMessage{connection: c, done: make(chan struct{})}
+func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
+       return s.SendWaitableTimeout(m, Forever)
 }
 
-func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
-func (sm *sentMessage) Value() interface{}     { return sm.value }
-func (sm *sentMessage) Disposition() (Disposition, error) {
-       <-sm.done
-       return sm.disposition, sm.err
+func (s *sender) SendForget(m amqp.Message) {
+       s.SendForgetTimeout(m, Forever)
 }
 
-func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, 
error) {
-       if _, err := timedReceive(sm.done, timeout); err == Timeout {
-               return sm.disposition, Timeout
-       } else {
-               return sm.disposition, sm.err
-       }
+func (s *sender) SendSync(m amqp.Message) Outcome {
+       return <-s.SendWaitable(m)
 }
 
-func (sm *sentMessage) Forget() {
-       sm.connection.engine.Inject(func() {
-               sm.delivery.Settle()
-               delete(sm.connection.handler.sentMessages, sm.delivery)
-       })
-       sm.finish()
+// handler goroutine
+func (s *sender) closed(err error) {
+       s.link.closed(err)
+       close(s.credit)
 }
 
-func (sm *sentMessage) settled(err error) {
-       if sm.delivery.Settled() {
-               sm.disposition = Disposition(sm.delivery.Remote().Type())
-       }
-       sm.err = err
-       sm.finish()
+func newSender(l link) *sender {
+       s := &sender{link: l, credit: make(chan struct{}, 1)}
+       s.handler().addLink(s.eLink, s)
+       s.link.open()
+       return s
 }
 
-func (sm *sentMessage) finish() {
-       select {
-       case <-sm.done: // No-op if already closed
-       default:
-               close(sm.done)
-       }
+// sentMessage records a sent message on the handler.
+type sentMessage struct {
+       ack   chan<- Outcome
+       value interface{}
 }
 
-func (sm *sentMessage) Error() error { return sm.err }
-
 // IncomingSender is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a sender link.
 type IncomingSender struct {
@@ -307,3 +266,9 @@ type IncomingSender struct {
 func (in *IncomingSender) Accept() Endpoint {
        return in.accept(func() Endpoint { return newSender(in.link) })
 }
+
+// Call in injected functions to check if the sender is valid.
+func (s *sender) valid() bool {
+       s2, ok := s.handler().links[s.eLink].(*sender)
+       return ok && s2 == s
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index d347c99..18d8bc8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -53,7 +53,7 @@ func newSession(c *connection, es proton.Session, setting 
...SessionOption) *ses
        s := &session{
                connection: c,
                eSession:   es,
-               endpoint:   endpoint{str: es.String()},
+               endpoint:   makeEndpoint(es.String()),
        }
        for _, set := range setting {
                set(s)
@@ -67,12 +67,20 @@ func newSession(c *connection, es proton.Session, setting 
...SessionOption) *ses
 func (s *session) Connection() Connection     { return s.connection }
 func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
 func (s *session) engine() *proton.Engine     { return s.connection.engine }
+
 func (s *session) Close(err error) {
-       s.engine().Inject(func() { localClose(s.eSession, err) })
+       s.engine().Inject(func() {
+               if s.Error() == nil {
+                       localClose(s.eSession, err)
+               }
+       })
 }
 
 func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
        err = s.engine().InjectWait(func() error {
+               if s.Error() != nil {
+                       return s.Error()
+               }
                l, err := localLink(s, true, setting...)
                if err == nil {
                        snd = newSender(l)
@@ -84,6 +92,9 @@ func (s *session) Sender(setting ...LinkOption) (snd Sender, 
err error) {
 
 func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
        err = s.engine().InjectWait(func() error {
+               if s.Error() != nil {
+                       return s.Error()
+               }
                l, err := localLink(s, false, setting...)
                if err == nil {
                        rcv = newReceiver(l)
@@ -93,12 +104,6 @@ func (s *session) Receiver(setting ...LinkOption) (rcv 
Receiver, err error) {
        return
 }
 
-// Called from handler on closed.
-func (s *session) closed(err error) {
-       s.err.Set(err)
-       s.err.Set(Closed)
-}
-
 // IncomingSender is sent on the Connection.Incoming() channel when there is an
 // incoming request to open a session.
 type IncomingSession struct {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
index 0015185..51bfbc5 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
@@ -21,6 +21,7 @@ package electron
 
 import (
        "fmt"
+       "math"
        "reflect"
        "time"
 )
@@ -41,7 +42,7 @@ import (
 var Timeout = fmt.Errorf("timeout")
 
 // Forever can be used as a timeout parameter to indicate wait forever.
-const Forever time.Duration = -1
+const Forever time.Duration = math.MaxInt64
 
 // timedReceive receives on channel (which can be a chan of any type), waiting
 // up to timeout.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 95d70e9..2e67ef7 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -315,6 +315,13 @@ func (eng *Engine) Run() error {
        wait.Wait()
        close(eng.running) // Signal goroutines have exited and Error is set.
 
+       // Execute any injected functions for side effects on application data 
structures.
+       inject := eng.inject
+       eng.inject = nil // Further calls to Inject() will return an error.
+       for f := range inject {
+               f()
+       }
+
        if !eng.connection.IsNil() {
                eng.connection.Free()
        }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to