PROTON-1047: go: improved ack handling in electron API, add to broker example
Sender interface changed - SendAsync: flexible and efficient server-side ack handling across multiple Senders. - SendSync, SendForget, SendWaitable: easy-to-use methods for common client cases. electron broker.go, send.go examples demonstrate async send handling. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd8ad96f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd8ad96f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd8ad96f Branch: refs/heads/go1 Commit: cd8ad96f2b3022344198abe8b76f0798758dbe9d Parents: 41eac74 Author: Alan Conway <[email protected]> Authored: Wed Nov 11 15:03:07 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Wed Nov 18 14:27:57 2015 -0500 ---------------------------------------------------------------------- examples/go/electron/broker.go | 133 +++++-- examples/go/electron/receive.go | 11 +- examples/go/electron/send.go | 25 +- examples/go/proton/broker.go | 9 +- examples/go/util/queue.go | 10 + .../go/src/qpid.apache.org/amqp/error.go | 2 +- .../src/qpid.apache.org/electron/connection.go | 27 +- .../go/src/qpid.apache.org/electron/doc.go | 3 +- .../go/src/qpid.apache.org/electron/endpoint.go | 31 +- .../go/src/qpid.apache.org/electron/handler.go | 16 +- .../go/src/qpid.apache.org/electron/link.go | 23 +- .../qpid.apache.org/electron/messaging_test.go | 71 ++-- .../go/src/qpid.apache.org/electron/receiver.go | 29 +- .../go/src/qpid.apache.org/electron/sender.go | 361 +++++++++---------- .../go/src/qpid.apache.org/electron/session.go | 21 +- .../go/src/qpid.apache.org/electron/time.go | 3 +- .../go/src/qpid.apache.org/proton/engine.go | 7 + 17 files changed, 430 insertions(+), 352 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go index f68deb1..66941b7 100644 --- a/examples/go/electron/broker.go +++ b/examples/go/electron/broker.go @@ -33,6 +33,7 @@ import ( "log" "net" "os" + "qpid.apache.org/amqp" "qpid.apache.org/electron" ) @@ -52,7 +53,12 @@ var qsize = flag.Int("qsize", 1000, "Max queue size") func main() { flag.Usage = usage flag.Parse() - b := &broker{util.MakeQueues(*qsize), electron.NewContainer("")} + b := &broker{ + queues: util.MakeQueues(*qsize), + container: electron.NewContainer(""), + acks: make(chan electron.Outcome), + sent: make(chan sentMessage), + } if err := b.run(); err != nil { log.Fatal(err) } @@ -60,18 +66,31 @@ func main() { // State for the broker type broker struct { - queues util.Queues - container electron.Container + queues util.Queues // A collection of queues. + container electron.Container // electron.Container manages AMQP connections. + sent chan sentMessage // Channel to record sent messages. + acks chan electron.Outcome // Channel to receive the Outcome of sent messages. +} + +// Record of a sent message and the queue it came from. +// If a message is rejected or not acknowledged due to a failure, we will put it back on the queue. +type sentMessage struct { + m amqp.Message + q util.Queue } -// Listens for connections and starts an electron.Connection for each one. +// run listens for incoming net.Conn connections and starts an electron.Connection for each one. func (b *broker) run() error { listener, err := net.Listen("tcp", *addr) if err != nil { return err } defer listener.Close() - fmt.Printf("Listening on %s\n", listener.Addr()) + fmt.Printf("Listening on %v\n", listener.Addr()) + + go b.acknowledgements() // Handles acknowledgements for all connections. + + // Start a goroutine for each new connections for { conn, err := listener.Accept() if err != nil { @@ -83,69 +102,107 @@ func (b *broker) run() error { util.Debugf("Connection error: %v", err) continue } - go b.accept(c) // Goroutine to accept incoming sessions and links. + cc := &connection{b, c} + go cc.run() // Handle the connection util.Debugf("Accepted %v", c) } } -// accept remotely-opened endpoints (Session, Sender and Receiver) +// State for a broker connectoin +type connection struct { + broker *broker + connection electron.Connection +} + +// accept remotely-opened endpoints (Session, Sender and Receiver) on a connection // and start goroutines to service them. -func (b *broker) accept(c electron.Connection) { - for in := range c.Incoming() { +func (c *connection) run() { + for in := range c.connection.Incoming() { switch in := in.(type) { case *electron.IncomingSender: if in.Source() == "" { - util.Debugf("sender has no source: %s", in) - break + in.Reject(fmt.Errorf("no source")) + } else { + go c.sender(in.Accept().(electron.Sender)) } - go b.sender(in.Accept().(electron.Sender)) case *electron.IncomingReceiver: if in.Target() == "" { - util.Debugf("receiver has no target: %s", in) - break + in.Reject(fmt.Errorf("no target")) + } else { + in.SetPrefetch(true) + in.SetCapacity(*credit) // Pre-fetch up to credit window. + go c.receiver(in.Accept().(electron.Receiver)) } - in.SetPrefetch(true) - in.SetCapacity(*credit) // Pre-fetch up to credit window. - go b.receiver(in.Accept().(electron.Receiver)) default: in.Accept() // Accept sessions unconditionally } } - util.Debugf("incoming closed: %s", c) + util.Debugf("incoming closed: %v", c.connection) +} + +// receiver receives messages and pushes to a queue. +func (c *connection) receiver(receiver electron.Receiver) { + q := c.broker.queues.Get(receiver.Target()) + for { + if rm, err := receiver.Receive(); err == nil { + util.Debugf("%v: received %v", receiver, util.FormatMessage(rm.Message)) + q <- rm.Message + rm.Accept() + } else { + util.Debugf("%v error: %v", receiver, err) + break + } + } } // sender pops messages from a queue and sends them. -func (b *broker) sender(sender electron.Sender) { - q := b.queues.Get(sender.Source()) +func (c *connection) sender(sender electron.Sender) { + q := c.broker.queues.Get(sender.Source()) for { - m, ok := <-q - if !ok { // Queue closed + if sender.Error() != nil { + util.Debugf("%v closed: %v", sender, sender.Error()) return } - if err := sender.SendForget(m); err == nil { - util.Debugf("%s send: %s", sender, util.FormatMessage(m)) - } else { - util.Debugf("%s error: %s", sender, err) - q <- m // Put it back on the queue. - return + select { + + case m := <-q: + util.Debugf("%v: sent %v", sender, util.FormatMessage(m)) + sm := sentMessage{m, q} + c.broker.sent <- sm // Record sent message + sender.SendAsync(m, c.broker.acks, sm) // Receive outcome on c.broker.acks with Value sm + + case <-sender.Done(): // break if sender is closed + break } } } -// receiver receives messages and pushes to a queue. -func (b *broker) receiver(receiver electron.Receiver) { - q := b.queues.Get(receiver.Target()) +// acknowledgements keeps track of sent messages and receives outcomes. +// +// We could have handled outcomes separately per-connection, per-sender or even +// per-message. Message outcomes are returned via channels defined by the user +// so they can be grouped in any way that suits the application. +func (b *broker) acknowledgements() { + sentMap := make(map[sentMessage]bool) for { - if rm, err := receiver.Receive(); err == nil { - util.Debugf("%s: received %s", receiver, util.FormatMessage(rm.Message)) - q <- rm.Message - rm.Accept() - } else { - util.Debugf("%s error: %s", receiver, err) - break + select { + case sm, ok := <-b.sent: // A local sender records that it has sent a message. + if ok { + sentMap[sm] = true + } else { + return // Closed + } + case outcome := <-b.acks: // The message outcome is available + sm := outcome.Value.(sentMessage) + delete(sentMap, sm) + if outcome.Status != electron.Accepted { // Error, release or rejection + sm.q.PutBack(sm.m) // Put the message back on the queue. + util.Debugf("message %v put back, status %v, error %v", + util.FormatMessage(sm.m), outcome.Status, outcome.Error) + } } } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go index f7d41fa..d631726 100644 --- a/examples/go/electron/receive.go +++ b/examples/go/electron/receive.go @@ -85,15 +85,12 @@ func main() { // Loop receiving messages and sending them to the main() goroutine for { - rm, err := r.Receive() - switch err { - case electron.Closed: - util.Debugf("closed %s", urlStr) + if rm, err := r.Receive(); err != nil { + util.Debugf("closed %v: %v", urlStr, err) return - case nil: + } else { + rm.Accept() messages <- rm.Message - default: - log.Fatal(err) } } }(urlStr) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/send.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go index c9bdbc9..4c9a0bf 100644 --- a/examples/go/electron/send.go +++ b/examples/go/electron/send.go @@ -42,11 +42,6 @@ Send messages to each URL concurrently with body "<url-path>-<n>" where n is the var count = flag.Int64("count", 1, "Send this may messages per address.") -type sent struct { - name string - sentMessage electron.SentMessage -} - func main() { flag.Usage = usage flag.Parse() @@ -58,9 +53,10 @@ func main() { os.Exit(1) } - sentChan := make(chan sent) // Channel to receive all the delivery receipts. - var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. - wait.Add(len(urls)) // Wait for one goroutine per URL. + sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements. + + var wait sync.WaitGroup + wait.Add(len(urls)) // Wait for one goroutine per URL. _, prog := path.Split(os.Args[0]) container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid())) @@ -91,9 +87,7 @@ func main() { m := amqp.NewMessage() body := fmt.Sprintf("%v-%v", url.Path, i) m.Marshal(body) - sentMessage, err := s.Send(m) - util.ExitIf(err) - sentChan <- sent{body, sentMessage} + s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan } }(urlStr) } @@ -102,12 +96,11 @@ func main() { expect := int(*count) * len(urls) util.Debugf("Started senders, expect %v acknowledgements\n", expect) for i := 0; i < expect; i++ { - d := <-sentChan - disposition, err := d.sentMessage.Disposition() - if err != nil { - util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err) + out := <-sentChan // Outcome of async sends. + if out.Error != nil { + util.Debugf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error) } else { - util.Debugf("acknowledgement[%v] %v (%v)\n", i, d.name, disposition) + util.Debugf("acknowledgement[%v] %v (%v)\n", i, out.Value, out.Status) } } fmt.Printf("Received all %v acknowledgements\n", expect) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/proton/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go index 3eb5880..6458665 100644 --- a/examples/go/proton/broker.go +++ b/examples/go/proton/broker.go @@ -24,6 +24,9 @@ under the License. // messages to queues or subscribe to receive messages from them. // +// TODO: show how to handle acknowledgedments from receivers and put rejected or +// un-acknowledged messages back on their queues. + package main import ( @@ -277,7 +280,7 @@ func (s *sender) sendable() { // run runs in a separate goroutine. It monitors the queue for messages and injects // a function to send them when there is credit func (s *sender) run() { - var q chan amqp.Message // q is nil initially as we have no credit. + var q util.Queue // q is nil initially as we have no credit. for { select { case _, ok := <-s.credit: @@ -293,7 +296,7 @@ func (s *sender) run() { q = nil // Assume all credit will be used used, will be signaled otherwise. s.h.injecter.Inject(func() { // Inject handler function to actually send if s.h.senders[s.l] != s { // The sender has been closed by the remote end. - go func() { q <- m }() // Put the message back on the queue but don't block + q.PutBack(m) // Put the message back on the queue but don't block return } if s.sendOne(m) != nil { @@ -322,7 +325,7 @@ func (s *sender) sendOne(m amqp.Message) error { delivery.Settle() // Pre-settled, unreliable. util.Debugf("link %s sent %s", s.l, util.FormatMessage(m)) } else { - go func() { s.q <- m }() // Put the message back on the queue but don't block + s.q.PutBack(m) // Put the message back on the queue, don't block } return err } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/util/queue.go ---------------------------------------------------------------------- diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go index d844c0d..2eaba72 100644 --- a/examples/go/util/queue.go +++ b/examples/go/util/queue.go @@ -27,6 +27,16 @@ import ( // Use a buffered channel as a very simple queue. type Queue chan amqp.Message +// Put a message back on the queue, does not block. +func (q Queue) PutBack(m amqp.Message) { + select { + case q <- m: + default: + // Not an efficient implementation but ensures we don't block the caller. + go func() { q <- m }() + } +} + // Concurrent-safe map of queues. type Queues struct { queueSize int http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go index 4096cdc..349fc41 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go +++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go @@ -39,7 +39,7 @@ import ( type Error struct{ Name, Description string } // Error implements the Go error interface for AMQP error errors. -func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) } +func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, c.Description) } // Errorf makes a Error with name and formatted description as per fmt.Sprintf func Errorf(name, format string, arg ...interface{}) Error { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go index 3462b92..8a9e6cd 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go @@ -69,10 +69,10 @@ type Connection interface { // to close it with an error. The specific Incoming types have additional // methods to configure the endpoint. // - // Delay in receiving from Incoming() or calling Accept/Reject will block - // proton. Normally you should have a dedicated goroutine receive from this - // channel and start a new goroutine to serve each endpoint accepted. The - // channel is closed when the Connection closes. + // Not receiving from Incoming() or not calling Accept/Reject will block the + // electron event loop. Normally you would have a dedicated goroutine receive + // from Incoming() and start new goroutines to serve each incoming endpoint. + // The channel is closed when the Connection closes. // Incoming() <-chan Incoming } @@ -103,15 +103,13 @@ type connection struct { incoming chan Incoming handler *handler engine *proton.Engine - err proton.ErrorHolder eConnection proton.Connection defaultSession Session - done chan struct{} } func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) { - c := &connection{container: cont, conn: conn, done: make(chan struct{})} + c := &connection{container: cont, conn: conn} c.handler = newHandler(c) var err error c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) @@ -121,12 +119,20 @@ func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) for _, set := range setting { set(c) } - c.str = c.engine.String() + c.endpoint = makeEndpoint(c.engine.String()) c.eConnection = c.engine.Connection() - go func() { c.engine.Run(); close(c.done) }() + go c.run() return c, nil } +func (c *connection) run() { + c.engine.Run() + if c.incoming != nil { + close(c.incoming) + } + c.closed(Closed) +} + func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) } func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) } @@ -134,6 +140,9 @@ func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect func (c *connection) Session(setting ...SessionOption) (Session, error) { var s Session err := c.engine.InjectWait(func() error { + if c.Error() != nil { + return c.Error() + } eSession, err := c.engine.Connection().Session() if err == nil { eSession.Open() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go index eaa6e7a..a484900 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go @@ -35,7 +35,8 @@ Send() and Receive() messages. You can create an AMQP server connection by calling Connection.Server() and Connection.Listen() before calling Connection.Open(). A server connection can negotiate protocol security details and can accept incoming links opened from -the remote end of the connection +the remote end of the connection. + */ package electron http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go index 057e572..f04b240 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go @@ -48,15 +48,31 @@ type Endpoint interface { // Connection containing the endpoint Connection() Connection + + // Done returns a channel that will close when the endpoint closes. + // Error() will contain the reason. + Done() <-chan struct{} } type endpoint struct { - err proton.ErrorHolder - str string // Must be set by the value that embeds endpoint. + err proton.ErrorHolder + str string // Must be set by the value that embeds endpoint. + done chan struct{} +} + +func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} } + +func (e *endpoint) closed(err error) { + e.err.Set(err) + e.err.Set(Closed) + close(e.done) } func (e *endpoint) String() string { return e.str } -func (e *endpoint) Error() error { return e.err.Get() } + +func (e *endpoint) Error() error { return e.err.Get() } + +func (e *endpoint) Done() <-chan struct{} { return e.done } // Call in proton goroutine to close an endpoint locally // handler will complete the close when remote end closes. @@ -65,3 +81,12 @@ func localClose(ep proton.Endpoint, err error) { proton.CloseError(ep, err) } } + +// Used to indicate that a channel has closed which normally is because the endpoint is closed. +func errorOrClosed(e Endpoint) error { + if e.Error() != nil { + return e.Error() + } else { + return Closed + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go index 1b1164c..1586026 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go @@ -30,7 +30,7 @@ type handler struct { delegator *proton.MessagingAdapter connection *connection links map[proton.Link]Link - sentMessages map[proton.Delivery]*sentMessage + sentMessages map[proton.Delivery]sentMessage sessions map[proton.Session]*session } @@ -38,7 +38,7 @@ func newHandler(c *connection) *handler { h := &handler{ connection: c, links: make(map[proton.Link]Link), - sentMessages: make(map[proton.Delivery]*sentMessage), + sentMessages: make(map[proton.Delivery]sentMessage), sessions: make(map[proton.Session]*session), } h.delegator = proton.NewMessagingAdapter(h) @@ -64,8 +64,10 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) } case proton.MSettled: - if sm := h.sentMessages[e.Delivery()]; sm != nil { - sm.settled(nil) + if sm, ok := h.sentMessages[e.Delivery()]; ok { + d := e.Delivery().Remote() + sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value} + delete(h.sentMessages, e.Delivery()) } case proton.MSendable: @@ -123,15 +125,19 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)) err := h.connection.Error() + for l, _ := range h.links { h.linkClosed(l, err) } + h.links = nil for _, s := range h.sessions { s.closed(err) } + h.sessions = nil for _, sm := range h.sentMessages { - sm.settled(err) + sm.ack <- Outcome{Unacknowledged, err, sm.value} } + h.sentMessages = nil } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go index cadc2c1..91efa8e 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go @@ -144,7 +144,6 @@ type link struct { session *session eLink proton.Link - done chan struct{} // Closed when link is closed } func (l *link) Source() string { return l.source } @@ -167,7 +166,6 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) isSender: isSender, capacity: 1, prefetch: false, - done: make(chan struct{}), } for _, set := range setting { set(&l) @@ -188,8 +186,8 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) l.eLink.Target().SetAddress(l.target) l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle)) l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle)) - l.str = l.eLink.String() l.eLink.Open() + l.endpoint = makeEndpoint(l.eLink.String()) return l, nil } @@ -213,23 +211,18 @@ func makeIncomingLink(sn *session, eLink proton.Link) incomingLink { rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()), capacity: 1, prefetch: false, - done: make(chan struct{}), + endpoint: makeEndpoint(eLink.String()), }, } - l.str = eLink.String() return l } -// Called in proton goroutine on closed or disconnected -func (l *link) closed(err error) { - l.err.Set(err) - l.err.Set(Closed) // If no error set, mark as closed. - close(l.done) -} - // Not part of Link interface but use by Sender and Receiver. func (l *link) Credit() (credit int, err error) { err = l.engine().InjectWait(func() error { + if l.Error() != nil { + return l.Error() + } credit = l.eLink.Credit() return nil }) @@ -240,7 +233,11 @@ func (l *link) Credit() (credit int, err error) { func (l *link) Capacity() int { return l.capacity } func (l *link) Close(err error) { - l.engine().Inject(func() { localClose(l.eLink, err) }) + l.engine().Inject(func() { + if l.Error() == nil { + localClose(l.eLink, err) + } + }) } func (l *link) open() { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go index 3b5a062..5af57e8 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go @@ -80,7 +80,6 @@ func closeClientServer(client Session, server Connection) { // Send a message one way with a client sender and server receiver, verify ack. func TestClientSendServerReceive(t *testing.T) { - timeout := time.Second * 2 nLinks := 3 nMessages := 3 @@ -116,15 +115,14 @@ func TestClientSendServerReceive(t *testing.T) { for i := 0; i < nLinks; i++ { for j := 0; j < nMessages; j++ { - var sm SentMessage - // Client send + ack := make(chan Outcome, 1) sendDone := make(chan struct{}) go func() { defer close(sendDone) m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j)) var err error - sm, err = s[i].Send(m) + s[i].SendAsync(m, ack, "testing") if err != nil { t.Fatal(err) } @@ -141,16 +139,19 @@ func TestClientSendServerReceive(t *testing.T) { // Should not be acknowledged on client yet <-sendDone - if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d { - t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err) + select { + case <-ack: + t.Errorf("unexpected ack") + default: } - // Server ack - if err := rm.Acknowledge(Rejected); err != nil { + + // Server send ack + if err := rm.Reject(); err != nil { t.Error(err) } // Client get ack. - if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d { - t.Errorf("want [rejected/nil] got [%v/%v]", d, err) + if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected { + t.Error("unexpected ack: ", a.Status, a.Error, a.Value) } } } @@ -166,12 +167,10 @@ func TestClientReceiver(t *testing.T) { s := in.Accept().(Sender) go func() { for i := int32(0); i < int32(nMessages); i++ { - sm, err := s.Send(amqp.NewMessageWith(i)) - if err != nil { - t.Error(err) + out := s.SendSync(amqp.NewMessageWith(i)) + if out.Error != nil { + t.Error(out.Error) return - } else { - sm.Disposition() // Sync send. } } s.Close(nil) @@ -235,10 +234,10 @@ func TestTimeouts(t *testing.T) { short := time.Millisecond long := time.Second m := amqp.NewMessage() - if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout. + if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout. t.Error("want Timeout got", err) } - if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout. + if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout. t.Error("want Timeout got", err) } // Test receive with timeout @@ -250,17 +249,15 @@ func TestTimeouts(t *testing.T) { t.Error("want Timeout got", err) } // There is now a credit on the link due to receive - sm, err := snd.SendTimeout(m, long) - if err != nil { - t.Fatal(err) - } + ack := make(chan Outcome) + snd.SendAsyncTimeout(m, ack, nil, short) // Disposition should timeout - if _, err = sm.DispositionTimeout(long); err != Timeout { - t.Error("want Timeout got", err) - } - if _, err = sm.DispositionTimeout(short); err != Timeout { - t.Error("want Timeout got", err) + select { + case <-ack: + t.Errorf("want Timeout got %#v", ack) + case <-time.After(short): } + // Receive and accept rm, err := rcv.ReceiveTimeout(long) if err != nil { @@ -268,9 +265,8 @@ func TestTimeouts(t *testing.T) { } rm.Accept() // Sender get ack - d, err := sm.DispositionTimeout(long) - if err != nil || d != Accepted { - t.Errorf("want (rejected, nil) got (%v, %v)", d, err) + if a := <-ack; a.Status != Accepted || a.Error != nil { + t.Errorf("want (accepted, nil) got %#v", a) } } @@ -329,7 +325,7 @@ type result struct { func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) } func doSend(snd Sender, results chan result) { - _, err := snd.Send(amqp.NewMessage()) + err := snd.SendSync(amqp.NewMessage()).Error results <- result{"send", err} } @@ -338,9 +334,8 @@ func doReceive(rcv Receiver, results chan result) { results <- result{"receive", err} } -func doDisposition(sm SentMessage, results chan result) { - _, err := sm.Disposition() - results <- result{"disposition", err} +func doDisposition(ack <-chan Outcome, results chan result) { + results <- result{"disposition", (<-ack).Error} } // Test that closing Links interrupts blocked link functions. @@ -391,9 +386,8 @@ func TestConnectionCloseInterrupt1(t *testing.T) { // Connection.Close() interrupts Send, Receive, Disposition. snd, rcv := pairs.senderReceiver() go doReceive(rcv, results) - sm, err := snd.Send(amqp.NewMessage()) - fatalIf(t, err) - go doDisposition(sm, results) + ack := snd.SendWaitable(amqp.NewMessage()) + go doDisposition(ack, results) snd, rcv = pairs.senderReceiver() go doSend(snd, results) rcv, snd = pairs.receiverSender() @@ -416,9 +410,8 @@ func TestConnectionCloseInterrupt2(t *testing.T) { // Connection.Close() interrupts Send, Receive, Disposition. snd, rcv := pairs.senderReceiver() go doReceive(rcv, results) - sm, err := snd.Send(amqp.NewMessage()) - fatalIf(t, err) - go doDisposition(sm, results) + ack := snd.SendWaitable(amqp.NewMessage()) + go doDisposition(ack, results) snd, rcv = pairs.senderReceiver() go doSend(snd, results) rcv, snd = pairs.receiverSender() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go index 4ff83b4..22bdc7e 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go @@ -77,6 +77,9 @@ type prefetchPolicy struct{} func (p prefetchPolicy) Flow(r *receiver) { r.engine().Inject(func() { + if r.Error() != nil { + return + } _, _, max := r.credit() if max > 0 { r.eLink.Flow(max) @@ -94,6 +97,9 @@ type noPrefetchPolicy struct{ waiting int } func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine r.engine().Inject(func() { + if r.Error() != nil { + return + } len, credit, max := r.credit() add := p.waiting - (len + credit) if add > max { @@ -202,20 +208,23 @@ type ReceivedMessage struct { receiver Receiver } -// Acknowledge a ReceivedMessage with the given disposition code. -func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error { - return rm.receiver.(*receiver).engine().InjectWait(func() error { - // Settle doesn't return an error but if the receiver is broken the settlement won't happen. - rm.eDelivery.SettleAs(uint64(disposition)) - return rm.receiver.Error() +// Acknowledge a ReceivedMessage with the given delivery status. +func (rm *ReceivedMessage) acknowledge(status uint64) error { + return rm.receiver.(*receiver).engine().Inject(func() { + // Deliveries are valid as long as the connection is, unless settled. + rm.eDelivery.SettleAs(uint64(status)) }) } -// Accept is short for Acknowledge(Accpeted) -func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) } +// Accept tells the sender that we take responsibility for processing the message. +func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) } + +// Reject tells the sender we consider the message invalid and unusable. +func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) } -// Reject is short for Acknowledge(Rejected) -func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) } +// Release tells the sender we will not process the message but some other +// receiver might. +func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) } // IncomingReceiver is sent on the Connection.Incoming() channel when there is // an incoming request to open a receiver link. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go index 11f019b..573e9da 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go @@ -25,278 +25,237 @@ import "C" import ( "qpid.apache.org/amqp" "qpid.apache.org/proton" - "reflect" "time" ) // Sender is a Link that sends messages. +// +// The result of sending a message is provided by an Outcome value. +// +// A sender can buffer messages up to the credit limit provided by the remote receiver. +// Send* methods will block if the buffer is full until there is space. +// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. +// type Sender interface { Link - // Send a message without waiting for acknowledgement. Returns a SentMessage. - // use SentMessage.Disposition() to wait for acknowledgement and get the - // disposition code. - // - // If the send buffer is full, send blocks until there is space in the buffer. - Send(m amqp.Message) (sm SentMessage, err error) + // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. + // Returns an Outcome, which may contain an error if the message could not be sent. + SendSync(m amqp.Message) Outcome + + // SendWaitable puts a message in the send buffer and returns a channel that + // you can use to wait for the Outcome of just that message. The channel is + // buffered so you can receive from it whenever you want without blocking anything. + SendWaitable(m amqp.Message) <-chan Outcome + + // SendForget buffers a message for sending and returns, with no notification of the outcome. + SendForget(m amqp.Message) - // SendTimeout is like send but only waits up to timeout for buffer space. + // SendAsync puts a message in the send buffer and returns immediately. An + // Outcome with Value = value will be sent to the ack channel when the remote + // receiver has acknowledged the message or if there is an error. // - // Returns Timeout error if the timeout expires and the message has not been sent. - SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error) + // You can use the same ack channel for many calls to SendAsync(), possibly on + // many Senders. The channel will receive the outcomes in the order they + // become available. The channel should be buffered and/or served by dedicated + // goroutines to avoid blocking the connection. + // + // If ack == nil no Outcome is sent. + SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) - // Send a message and forget it, there will be no acknowledgement. - // If the send buffer is full, send blocks until there is space in the buffer. - SendForget(m amqp.Message) error + SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) - // SendForgetTimeout is like send but only waits up to timeout for buffer space. - // Returns Timeout error if the timeout expires and the message has not been sent. - SendForgetTimeout(m amqp.Message, timeout time.Duration) error + SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome - // Credit indicates how many messages the receiving end of the link can accept. - // - // On a Sender credit can be negative, meaning that messages in excess of the - // receiver's credit limit have been buffered locally till credit is available. - Credit() (int, error) -} + SendForgetTimeout(m amqp.Message, timeout time.Duration) -type sendMessage struct { - m amqp.Message - sm SentMessage + SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome } -type sender struct { - link - credit chan struct{} // Signal available credit. +// Outcome provides information about the outcome of sending a message. +type Outcome struct { + // Status of the message: was it sent, how was it acknowledged. + Status SentStatus + // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise. + Error error + // Value provided by the application in SendAsync() + Value interface{} } -// Disposition indicates the outcome of a settled message delivery. -type Disposition uint64 +// SentStatus indicates the status of a sent message. +type SentStatus int const ( - // No disposition available: pre-settled, not yet acknowledged or an error occurred - NoDisposition Disposition = 0 + // Message was never sent + Unsent SentStatus = iota + // Message was sent but never acknowledged. It may or may not have been received. + Unacknowledged + // Message was sent pre-settled, no remote outcome is available. + Presettled // Message was accepted by the receiver - Accepted = proton.Accepted + Accepted // Message was rejected as invalid by the receiver - Rejected = proton.Rejected - // Message was not processed by the receiver but may be processed by some other receiver. - Released = proton.Released + Rejected + // Message was not processed by the receiver but may be valid for a different receiver + Released + // Receiver responded with an unrecognized status. + Unknown ) -// String human readable name for a Disposition. -func (d Disposition) String() string { - switch d { - case NoDisposition: - return "no-disposition" +// String human readable name for SentStatus. +func (s SentStatus) String() string { + switch s { + case Unsent: + return "unsent" + case Unacknowledged: + return "unacknowledged" case Accepted: return "accepted" case Rejected: return "rejected" case Released: return "released" - default: + case Unknown: return "unknown" + default: + return "invalid" } } -func (s *sender) Send(m amqp.Message) (SentMessage, error) { - return s.SendTimeout(m, Forever) -} - -func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) { - var sm SentMessage - if s.sndSettle == SndSettled { - sm = nil - } else { - sm = newSentMessage(s.session.connection) +// Convert proton delivery state code to SentStatus value +func sentStatus(d uint64) SentStatus { + switch d { + case proton.Accepted: + return Accepted + case proton.Rejected: + return Rejected + case proton.Released, proton.Modified: + return Released + default: + return Unknown } - return s.sendInternal(sendMessage{m, sm}, timeout) -} - -func (s *sender) SendForget(m amqp.Message) error { - return s.SendForgetTimeout(m, Forever) } -func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error { - snd := sendMessage{m, nil} - _, err := s.sendInternal(snd, timeout) - return err +// Sender implementation, held by handler. +type sender struct { + link + credit chan struct{} // Signal available credit. } -func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) { - if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit - if err == Closed { +func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { + // wait for credit + if _, err := timedReceive(s.credit, t); err != nil { + if err == Closed && s.Error != nil { err = s.Error() - assert(err != nil) } - return nil, err + ack <- Outcome{Unsent, err, v} + return } - if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil { - return nil, err - } - return snd.sm, nil -} - -// Send a message. Handler goroutine -func (s *sender) doSend(snd sendMessage) { - delivery, err := s.eLink.Send(snd.m) - switch sm := snd.sm.(type) { - case nil: - delivery.Settle() - case *sentMessage: - sm.delivery = delivery - if err != nil { - sm.settled(err) - } else { - s.handler().sentMessages[delivery] = sm + // Send a message in handler goroutine + err := s.engine().Inject(func() { + if s.Error() != nil { + if ack != nil { + ack <- Outcome{Unsent, s.Error(), v} + } + return } - default: - assert(false, "bad SentMessage type %T", snd.sm) - } - if s.eLink.Credit() > 0 { - s.sendable() // Signal credit. + if delivery, err := s.eLink.Send(m); err == nil { + if ack != nil { // We must report an outcome + if s.SndSettle() == SndSettled { + delivery.Settle() // Pre-settle if required + ack <- Outcome{Presettled, nil, v} + } else { + s.handler().sentMessages[delivery] = sentMessage{ack, v} + } + } else { // ack == nil, can't report outcome + if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to. + delivery.Settle() + } + } + } else { // err != nil + if ack != nil { + ack <- Outcome{Unsent, err, v} + } + } + if s.eLink.Credit() > 0 { // Signal there is still credit + s.sendable() + } + }) + if err != nil && ack != nil { + ack <- Outcome{Unsent, err, v} } } -// Signal the sender has credit. Any goroutine. +// Set credit flag if not already set. Non-blocking, any goroutine func (s *sender) sendable() { select { // Non-blocking - case s.credit <- struct{}{}: // Set the flag if not already set. + case s.credit <- struct{}{}: default: } } -func (s *sender) closed(err error) { - s.link.closed(err) - close(s.credit) +func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome { + out := make(chan Outcome, 1) + s.SendAsyncTimeout(m, out, nil, t) + return out } -func newSender(l link) *sender { - s := &sender{link: l, credit: make(chan struct{}, 1)} - s.handler().addLink(s.eLink, s) - s.link.open() - return s -} - -// SentMessage represents a previously sent message. It allows you to wait for acknowledgement. -type SentMessage interface { - - // Disposition blocks till the message is acknowledged and returns the - // disposition state. - // - // NoDisposition with Error() != nil means the Connection was closed before - // the message was acknowledged. - // - // NoDisposition with Error() == nil means the message was pre-settled or - // Forget() was called. - Disposition() (Disposition, error) - - // DispositionTimeout is like Disposition but gives up after timeout, see Timeout. - DispositionTimeout(time.Duration) (Disposition, error) - - // Forget interrupts any call to Disposition on this SentMessage and tells the - // peer we are no longer interested in the disposition of this message. - Forget() - - // Error returns the error that closed the disposition, or nil if there was no error. - // If the disposition closed because the connection closed, it will return Closed. - Error() error - - // Value is an optional value you wish to associate with the SentMessage. It - // can be the message itself or some form of identifier. - Value() interface{} - SetValue(interface{}) -} - -// SentMessageSet is a concurrent-safe set of sent messages that can be checked -// to get the next completed sent message -type SentMessageSet struct { - cases []reflect.SelectCase - sm []SentMessage - done chan SentMessage -} - -func (s *SentMessageSet) Add(sm SentMessage) { - s.sm = append(s.sm, sm) - s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)}) +func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) { + s.SendAsyncTimeout(m, nil, nil, t) } -// Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb -// or an error. -func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) { - s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases - if timeout == 0 { // Non-blocking - s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault}) - } else { - s.cases = append(s.cases, - reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))}) +func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome { + deadline := time.Now().Add(t) + ack := s.SendWaitableTimeout(m, t) + t = deadline.Sub(time.Now()) // Adjust for time already spent. + if t < 0 { + t = 0 } - chosen, _, _ := reflect.Select(s.cases) - if chosen > len(s.sm) { - return nil, Timeout + if out, err := timedReceive(ack, t); err == nil { + return out.(Outcome) } else { - sm := s.sm[chosen] - s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...) - return sm, nil + if err == Closed && s.Error() != nil { + err = s.Error() + } + return Outcome{Unacknowledged, err, nil} } } -// SentMessage implementation -type sentMessage struct { - connection *connection - done chan struct{} - delivery proton.Delivery - disposition Disposition - err error - value interface{} +func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) { + s.SendAsyncTimeout(m, ack, v, Forever) } -func newSentMessage(c *connection) *sentMessage { - return &sentMessage{connection: c, done: make(chan struct{})} +func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome { + return s.SendWaitableTimeout(m, Forever) } -func (sm *sentMessage) SetValue(v interface{}) { sm.value = v } -func (sm *sentMessage) Value() interface{} { return sm.value } -func (sm *sentMessage) Disposition() (Disposition, error) { - <-sm.done - return sm.disposition, sm.err +func (s *sender) SendForget(m amqp.Message) { + s.SendForgetTimeout(m, Forever) } -func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) { - if _, err := timedReceive(sm.done, timeout); err == Timeout { - return sm.disposition, Timeout - } else { - return sm.disposition, sm.err - } +func (s *sender) SendSync(m amqp.Message) Outcome { + return <-s.SendWaitable(m) } -func (sm *sentMessage) Forget() { - sm.connection.engine.Inject(func() { - sm.delivery.Settle() - delete(sm.connection.handler.sentMessages, sm.delivery) - }) - sm.finish() +// handler goroutine +func (s *sender) closed(err error) { + s.link.closed(err) + close(s.credit) } -func (sm *sentMessage) settled(err error) { - if sm.delivery.Settled() { - sm.disposition = Disposition(sm.delivery.Remote().Type()) - } - sm.err = err - sm.finish() +func newSender(l link) *sender { + s := &sender{link: l, credit: make(chan struct{}, 1)} + s.handler().addLink(s.eLink, s) + s.link.open() + return s } -func (sm *sentMessage) finish() { - select { - case <-sm.done: // No-op if already closed - default: - close(sm.done) - } +// sentMessage records a sent message on the handler. +type sentMessage struct { + ack chan<- Outcome + value interface{} } -func (sm *sentMessage) Error() error { return sm.err } - // IncomingSender is sent on the Connection.Incoming() channel when there is // an incoming request to open a sender link. type IncomingSender struct { @@ -307,3 +266,9 @@ type IncomingSender struct { func (in *IncomingSender) Accept() Endpoint { return in.accept(func() Endpoint { return newSender(in.link) }) } + +// Call in injected functions to check if the sender is valid. +func (s *sender) valid() bool { + s2, ok := s.handler().links[s.eLink].(*sender) + return ok && s2 == s +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go index d347c99..18d8bc8 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go @@ -53,7 +53,7 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption) *ses s := &session{ connection: c, eSession: es, - endpoint: endpoint{str: es.String()}, + endpoint: makeEndpoint(es.String()), } for _, set := range setting { set(s) @@ -67,12 +67,20 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption) *ses func (s *session) Connection() Connection { return s.connection } func (s *session) eEndpoint() proton.Endpoint { return s.eSession } func (s *session) engine() *proton.Engine { return s.connection.engine } + func (s *session) Close(err error) { - s.engine().Inject(func() { localClose(s.eSession, err) }) + s.engine().Inject(func() { + if s.Error() == nil { + localClose(s.eSession, err) + } + }) } func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) { err = s.engine().InjectWait(func() error { + if s.Error() != nil { + return s.Error() + } l, err := localLink(s, true, setting...) if err == nil { snd = newSender(l) @@ -84,6 +92,9 @@ func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) { func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) { err = s.engine().InjectWait(func() error { + if s.Error() != nil { + return s.Error() + } l, err := localLink(s, false, setting...) if err == nil { rcv = newReceiver(l) @@ -93,12 +104,6 @@ func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) { return } -// Called from handler on closed. -func (s *session) closed(err error) { - s.err.Set(err) - s.err.Set(Closed) -} - // IncomingSender is sent on the Connection.Incoming() channel when there is an // incoming request to open a session. type IncomingSession struct { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/time.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go index 0015185..51bfbc5 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go @@ -21,6 +21,7 @@ package electron import ( "fmt" + "math" "reflect" "time" ) @@ -41,7 +42,7 @@ import ( var Timeout = fmt.Errorf("timeout") // Forever can be used as a timeout parameter to indicate wait forever. -const Forever time.Duration = -1 +const Forever time.Duration = math.MaxInt64 // timedReceive receives on channel (which can be a chan of any type), waiting // up to timeout. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go index 95d70e9..2e67ef7 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go @@ -315,6 +315,13 @@ func (eng *Engine) Run() error { wait.Wait() close(eng.running) // Signal goroutines have exited and Error is set. + // Execute any injected functions for side effects on application data structures. + inject := eng.inject + eng.inject = nil // Further calls to Inject() will return an error. + for f := range inject { + f() + } + if !eng.connection.IsNil() { eng.connection.Free() } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
