Repository: qpid-proton Updated Branches: refs/heads/master 9f219cef0 -> 7486d5b85
NO-JIRA: go: Fix issues found by "go vet", minor simplification and bugfixing. - refactor to fix go vet errors about copying a Mutex. - hide Link endpoint interface, provide simpler LinkSettings common interface for Sender, Receiver. - simplified connection close logic - process all final events. - fix async sender blocking when no outcome requested. - simplify receiver flow control logic - simplify Send() logic Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7486d5b8 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7486d5b8 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7486d5b8 Branch: refs/heads/master Commit: 7486d5b853ff26f707efe0ded73fba0d231d3733 Parents: 9f219ce Author: Alan Conway <[email protected]> Authored: Fri Dec 11 13:08:48 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Thu Jan 7 17:26:45 2016 -0500 ---------------------------------------------------------------------- .../src/qpid.apache.org/electron/connection.go | 12 +- .../go/src/qpid.apache.org/electron/endpoint.go | 5 +- .../go/src/qpid.apache.org/electron/handler.go | 6 +- .../go/src/qpid.apache.org/electron/link.go | 90 ++++++------ .../qpid.apache.org/electron/messaging_test.go | 114 +++++++++------ .../go/src/qpid.apache.org/electron/receiver.go | 139 ++++++++----------- .../go/src/qpid.apache.org/electron/sender.go | 61 ++++---- .../go/src/qpid.apache.org/electron/session.go | 6 +- .../go/src/qpid.apache.org/proton/engine.go | 74 +++++----- .../go/src/qpid.apache.org/proton/wrappers.go | 2 +- 10 files changed, 264 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go index 386875d..1f8bd40 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go @@ -120,7 +120,7 @@ func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) for _, set := range setting { set(c) } - c.endpoint = makeEndpoint(c.engine.String()) + c.endpoint.init(c.engine.String()) c.eConnection = c.engine.Connection() go c.run() return c, nil @@ -134,9 +134,15 @@ func (c *connection) run() { c.closed(Closed) } -func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) } +func (c *connection) Close(err error) { + c.err.Set(err) + c.engine.Close(err) +} -func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) } +func (c *connection) Disconnect(err error) { + c.err.Set(err) + c.engine.Disconnect(err) +} func (c *connection) Session(setting ...SessionOption) (Session, error) { var s Session http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go index 2b1f62d..fc701c6 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go @@ -52,6 +52,9 @@ type Endpoint interface { // Done returns a channel that will close when the endpoint closes. // Error() will contain the reason. Done() <-chan struct{} + + // Called in handler goroutine when endpoint is remotely closed. + closed(err error) error } // DEVELOPER NOTES @@ -64,7 +67,7 @@ type endpoint struct { done chan struct{} } -func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} } +func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) } // Called in handler on a Closed event. Marks the endpoint as closed and the corresponding // proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go index 0237156..eb53df3 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go @@ -29,7 +29,7 @@ import ( type handler struct { delegator *proton.MessagingAdapter connection *connection - links map[proton.Link]Link + links map[proton.Link]Endpoint sentMessages map[proton.Delivery]sentMessage sessions map[proton.Session]*session } @@ -37,7 +37,7 @@ type handler struct { func newHandler(c *connection) *handler { h := &handler{ connection: c, - links: make(map[proton.Link]Link), + links: make(map[proton.Link]Endpoint), sentMessages: make(map[proton.Delivery]sentMessage), sessions: make(map[proton.Session]*session), } @@ -152,7 +152,7 @@ func (h *handler) incoming(in Incoming) { } } -func (h *handler) addLink(pl proton.Link, el Link) { +func (h *handler) addLink(pl proton.Link, el Endpoint) { h.links[pl] = el } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/link.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go index 91efa8e..80b4d5c 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go @@ -24,11 +24,8 @@ import ( "qpid.apache.org/proton" ) -// Link is the common interface for AMQP links. Sender and Receiver provide -// more methods for the sending or receiving end of a link respectively. -type Link interface { - Endpoint - +// Settings associated with a link +type LinkSettings interface { // Source address that messages are coming from. Source() string @@ -53,46 +50,41 @@ type Link interface { // Session containing the Link Session() Session - - // Called in event loop on closed event. - closed(err error) - // Called to open a link (local or accepted incoming link) - open() } // LinkOption can be passed when creating a sender or receiver link to set optional configuration. -type LinkOption func(*link) +type LinkOption func(*linkSettings) // Source returns a LinkOption that sets address that messages are coming from. -func Source(s string) LinkOption { return func(l *link) { l.source = s } } +func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } } // Target returns a LinkOption that sets address that messages are going to. -func Target(s string) LinkOption { return func(l *link) { l.target = s } } +func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } } // LinkName returns a LinkOption that sets the link name. -func LinkName(s string) LinkOption { return func(l *link) { l.target = s } } +func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = s } } // SndSettle returns a LinkOption that sets the send settle mode -func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } } +func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } } // RcvSettle returns a LinkOption that sets the send settle mode -func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } } +func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } } // SndSettleMode returns a LinkOption that defines when the sending end of the // link settles message delivery. type SndSettleMode proton.SndSettleMode // Capacity returns a LinkOption that sets the link capacity -func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } } +func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } } // Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender. -func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } } +func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } } // AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages // are sent but no acknowledgment is received, messages can be lost if there is // a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst func AtMostOnce() LinkOption { - return func(l *link) { + return func(l *linkSettings) { SndSettle(SndSettled)(l) RcvSettle(RcvFirst)(l) } @@ -104,7 +96,7 @@ func AtMostOnce() LinkOption { // chance that the message will be received twice in this case. Sets // SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst func AtLeastOnce() LinkOption { - return func(l *link) { + return func(l *linkSettings) { SndSettle(SndUnsettled)(l) RcvSettle(RcvFirst)(l) } @@ -129,10 +121,7 @@ const ( RcvSecond = RcvSettleMode(proton.RcvSecond) ) -type link struct { - endpoint - - // Link settings. +type linkSettings struct { source string target string linkName string @@ -141,31 +130,35 @@ type link struct { rcvSettle RcvSettleMode capacity int prefetch bool + session *session + eLink proton.Link +} - session *session - eLink proton.Link +type link struct { + endpoint + linkSettings } -func (l *link) Source() string { return l.source } -func (l *link) Target() string { return l.target } -func (l *link) LinkName() string { return l.linkName } -func (l *link) IsSender() bool { return l.isSender } -func (l *link) IsReceiver() bool { return !l.isSender } -func (l *link) SndSettle() SndSettleMode { return l.sndSettle } -func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle } -func (l *link) Session() Session { return l.session } -func (l *link) Connection() Connection { return l.session.Connection() } +func (l *linkSettings) Source() string { return l.source } +func (l *linkSettings) Target() string { return l.target } +func (l *linkSettings) LinkName() string { return l.linkName } +func (l *linkSettings) IsSender() bool { return l.isSender } +func (l *linkSettings) IsReceiver() bool { return !l.isSender } +func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle } +func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle } +func (l *link) Session() Session { return l.session } +func (l *link) Connection() Connection { return l.session.Connection() } func (l *link) engine() *proton.Engine { return l.session.connection.engine } func (l *link) handler() *handler { return l.session.connection.handler } -// Set up link fields and open the proton.Link -func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) { - l := link{ - session: sn, +// Open a link and return the linkSettings. +func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error) { + l := linkSettings{ isSender: isSender, capacity: 1, prefetch: false, + session: sn, } for _, set := range setting { set(&l) @@ -179,31 +172,29 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) l.eLink = l.session.eSession.Receiver(l.linkName) } if l.eLink.IsNil() { - l.err.Set(fmt.Errorf("cannot create link %s", l)) - return l, l.err.Get() + return l, fmt.Errorf("cannot create link %s", l.eLink) } l.eLink.Source().SetAddress(l.source) l.eLink.Target().SetAddress(l.target) l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle)) l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle)) l.eLink.Open() - l.endpoint = makeEndpoint(l.eLink.String()) return l, nil } type incomingLink struct { incoming - link + linkSettings + eLink proton.Link + sn *session } // Set up a link from an incoming proton.Link. func makeIncomingLink(sn *session, eLink proton.Link) incomingLink { l := incomingLink{ incoming: makeIncoming(eLink), - link: link{ - session: sn, + linkSettings: linkSettings{ isSender: eLink.IsSender(), - eLink: eLink, source: eLink.RemoteSource().Address(), target: eLink.RemoteTarget().Address(), linkName: eLink.Name(), @@ -211,7 +202,8 @@ func makeIncomingLink(sn *session, eLink proton.Link) incomingLink { rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()), capacity: 1, prefetch: false, - endpoint: makeEndpoint(eLink.String()), + eLink: eLink, + session: sn, }, } return l @@ -239,7 +231,3 @@ func (l *link) Close(err error) { } }) } - -func (l *link) open() { - l.eLink.Open() -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go index 5af57e8..0de7d16 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go @@ -270,24 +270,26 @@ func TestTimeouts(t *testing.T) { } } -// clientServer that returns sender/receiver pairs at opposite ends of link. +// A server that returns the opposite end of each client link via channels. type pairs struct { - t *testing.T - client Session - server Connection - rchan chan Receiver - schan chan Sender + t *testing.T + client Session + server Connection + rchan chan Receiver + schan chan Sender + capacity int + prefetch bool } -func newPairs(t *testing.T) *pairs { +func newPairs(t *testing.T, capacity int, prefetch bool) *pairs { p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)} p.client, p.server = newClientServer(t) go func() { for i := range p.server.Incoming() { switch i := i.(type) { case *IncomingReceiver: - i.SetCapacity(1) - i.SetPrefetch(false) + i.SetCapacity(capacity) + i.SetPrefetch(prefetch) p.rchan <- i.Accept().(Receiver) case *IncomingSender: p.schan <- i.Accept().(Sender) @@ -303,6 +305,7 @@ func (p *pairs) close() { closeClientServer(p.client, p.server) } +// Return a client sender and server receiver func (p *pairs) senderReceiver() (Sender, Receiver) { snd, err := p.client.Sender() fatalIf(p.t, err) @@ -310,6 +313,7 @@ func (p *pairs) senderReceiver() (Sender, Receiver) { return snd, rcv } +// Return a client receiver and server sender func (p *pairs) receiverSender() (Receiver, Sender) { rcv, err := p.client.Receiver() fatalIf(p.t, err) @@ -322,7 +326,7 @@ type result struct { err error } -func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) } +func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) } func doSend(snd Sender, results chan result) { err := snd.SendSync(amqp.NewMessage()).Error @@ -338,30 +342,48 @@ func doDisposition(ack <-chan Outcome, results chan result) { results <- result{"disposition", (<-ack).Error} } +// Senders get credit immediately if receivers have prefetch set +func TestSendReceivePrefetch(t *testing.T) { + pairs := newPairs(t, 1, true) + s, r := pairs.senderReceiver() + s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit. + if _, err := r.Receive(); err != nil { + t.Error(err) + } +} + +// Senders do not get credit till Receive() if receivers don't have prefetch +func TestSendReceiveNoPrefetch(t *testing.T) { + pairs := newPairs(t, 1, false) + s, r := pairs.senderReceiver() + done := make(chan struct{}, 1) + go func() { + s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit. + close(done) + }() + select { + case <-done: + t.Errorf("send should be blocked on credit") + default: + if _, err := r.Receive(); err != nil { + t.Error(err) + } else { + <-done + } // Should be unblocked now + } +} + // Test that closing Links interrupts blocked link functions. func TestLinkCloseInterrupt(t *testing.T) { - want := amqp.Errorf("x", "all bad") - pairs := newPairs(t) + want := amqp.Error{Name: "x", Description: "all bad"} + pairs := newPairs(t, 1, false) results := make(chan result) // Collect expected errors - // Sender.Close() interrupts Send() - snd, rcv := pairs.senderReceiver() - go doSend(snd, results) - snd.Close(want) - if r := <-results; want != r.err { - t.Errorf("want %#v got %#v", want, r) - } - - // Remote Receiver.Close() interrupts Send() - snd, rcv = pairs.senderReceiver() - go doSend(snd, results) - rcv.Close(want) - if r := <-results; want != r.err { - t.Errorf("want %#v got %#v", want, r) - } + // Note closing the link does not interrupt Send() calls, the AMQP spec says + // that deliveries can be settled after the link is closed. // Receiver.Close() interrupts Receive() - snd, rcv = pairs.senderReceiver() + snd, rcv := pairs.senderReceiver() go doReceive(rcv, results) rcv.Close(want) if r := <-results; want != r.err { @@ -379,44 +401,50 @@ func TestLinkCloseInterrupt(t *testing.T) { // Test closing the server end of a connection. func TestConnectionCloseInterrupt1(t *testing.T) { - want := amqp.Errorf("x", "bad") - pairs := newPairs(t) + want := amqp.Error{Name: "x", Description: "bad"} + pairs := newPairs(t, 1, true) results := make(chan result) // Collect expected errors // Connection.Close() interrupts Send, Receive, Disposition. snd, rcv := pairs.senderReceiver() - go doReceive(rcv, results) - ack := snd.SendWaitable(amqp.NewMessage()) - go doDisposition(ack, results) - snd, rcv = pairs.senderReceiver() go doSend(snd, results) + + rcv.Receive() rcv, snd = pairs.receiverSender() go doReceive(rcv, results) + + snd, rcv = pairs.senderReceiver() + ack := snd.SendWaitable(amqp.NewMessage()) + rcv.Receive() + go doDisposition(ack, results) + pairs.server.Close(want) for i := 0; i < 3; i++ { if r := <-results; want != r.err { - // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF. - t.Logf("want %v got %v", want, r.err) + t.Logf("want %v got %v", want, r) } } } // Test closing the client end of the connection. func TestConnectionCloseInterrupt2(t *testing.T) { - want := amqp.Errorf("x", "bad") - pairs := newPairs(t) + want := amqp.Error{Name: "x", Description: "bad"} + pairs := newPairs(t, 1, true) results := make(chan result) // Collect expected errors // Connection.Close() interrupts Send, Receive, Disposition. snd, rcv := pairs.senderReceiver() - go doReceive(rcv, results) - ack := snd.SendWaitable(amqp.NewMessage()) - go doDisposition(ack, results) - snd, rcv = pairs.senderReceiver() go doSend(snd, results) + rcv.Receive() + rcv, snd = pairs.receiverSender() go doReceive(rcv, results) - pairs.client.Close(want) + + snd, rcv = pairs.senderReceiver() + ack := snd.SendWaitable(amqp.NewMessage()) + go doDisposition(ack, results) + + pairs.client.Connection().Close(want) for i := 0; i < 3; i++ { if r := <-results; want != r.err { // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go index 22bdc7e..f2b7a52 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go @@ -29,7 +29,8 @@ import ( // Receiver is a Link that receives messages. // type Receiver interface { - Link + Endpoint + LinkSettings // Receive blocks until a message is available or until the Receiver is closed // and has no more buffered messages. @@ -64,108 +65,88 @@ type Receiver interface { Capacity() int } -// Flow control policy for a receiver. -type policy interface { - // Called at the start of Receive() to adjust credit before fetching a message. - Pre(*receiver) - // Called after Receive() has received a message from Buffer() before it returns. - // Non-nil error means no message was received because of an error. - Post(*receiver, error) -} - -type prefetchPolicy struct{} - -func (p prefetchPolicy) Flow(r *receiver) { - r.engine().Inject(func() { - if r.Error() != nil { - return - } - _, _, max := r.credit() - if max > 0 { - r.eLink.Flow(max) - } - }) -} -func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) } -func (p prefetchPolicy) Post(r *receiver, err error) { - if err == nil { - p.Flow(r) - } -} - -type noPrefetchPolicy struct{ waiting int } - -func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine - r.engine().Inject(func() { - if r.Error() != nil { - return - } - len, credit, max := r.credit() - add := p.waiting - (len + credit) - if add > max { - add = max // Don't overflow - } - if add > 0 { - r.eLink.Flow(add) - } - }) -} -func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) } -func (p noPrefetchPolicy) Post(r *receiver, err error) { - p.waiting-- - if err == nil { - p.Flow(r) - } -} - // Receiver implementation type receiver struct { link - buffer chan ReceivedMessage - policy policy + buffer chan ReceivedMessage + callers int } +func (r *receiver) Capacity() int { return cap(r.buffer) } +func (r *receiver) Prefetch() bool { return r.prefetch } + // Call in proton goroutine -func newReceiver(l link) *receiver { - r := &receiver{link: l} +func newReceiver(ls linkSettings) *receiver { + r := &receiver{link: link{linkSettings: ls}} + r.endpoint.init(r.link.eLink.String()) if r.capacity < 1 { r.capacity = 1 } - if r.prefetch { - r.policy = &prefetchPolicy{} - } else { - r.policy = &noPrefetchPolicy{} - } r.buffer = make(chan ReceivedMessage, r.capacity) r.handler().addLink(r.eLink, r) - r.link.open() + r.link.eLink.Open() + if r.prefetch { + r.flow(r.maxFlow()) + } return r } -// call in proton goroutine. -func (r *receiver) credit() (buffered, credit, max int) { - return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer) +// Call in proton gorotine. Max additional credit we can request. +func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - r.eLink.Credit() } + +func (r *receiver) flow(credit int) { + if credit > 0 { + r.eLink.Flow(credit) + } } -func (r *receiver) Capacity() int { return cap(r.buffer) } -func (r *receiver) Prefetch() bool { return r.prefetch } +// Inject flow check per-caller call when prefetch is off. +// Called with inc=1 at start of call, inc = -1 at end +func (r *receiver) caller(inc int) { + r.engine().Inject(func() { + r.callers += inc + need := r.callers - (len(r.buffer) + r.eLink.Credit()) + max := r.maxFlow() + if need > max { + need = max + } + r.flow(need) + }) +} +// Inject flow top-up if prefetch is enabled +func (r *receiver) flowTopUp() { + if r.prefetch { + r.engine().Inject(func() { r.flow(r.maxFlow()) }) + } +} + +// Not claled func (r *receiver) Receive() (rm ReceivedMessage, err error) { return r.ReceiveTimeout(Forever) } -func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) { +func (r *receiver) ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) { assert(r.buffer != nil, "Receiver is not open: %s", r) - r.policy.Pre(r) - defer func() { r.policy.Post(r, err) }() + select { // Check for immediate availability + case rm := <-r.buffer: + r.flowTopUp() + return rm, nil + default: + } + if !r.prefetch { // Per-caller flow control + r.caller(+1) + defer r.caller(-1) + } rmi, err := timedReceive(r.buffer, timeout) switch err { - case Timeout: - return ReceivedMessage{}, Timeout + case nil: + r.flowTopUp() + return rmi.(ReceivedMessage), err case Closed: return ReceivedMessage{}, r.Error() default: - return rmi.(ReceivedMessage), nil + return ReceivedMessage{}, err } } @@ -192,11 +173,11 @@ func (r *receiver) message(delivery proton.Delivery) { } } -func (r *receiver) closed(err error) { - r.link.closed(err) +func (r *receiver) closed(err error) error { if r.buffer != nil { close(r.buffer) } + return r.link.closed(err) } // ReceivedMessage contains an amqp.Message and allows the message to be acknowledged. @@ -240,5 +221,5 @@ func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch // Accept accepts an incoming receiver endpoint func (in *IncomingReceiver) Accept() Endpoint { - return in.accept(func() Endpoint { return newReceiver(in.link) }) + return in.accept(func() Endpoint { return newReceiver(in.linkSettings) }) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go index 834eb75..2f0e965 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go @@ -38,7 +38,8 @@ import ( // Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. // type Sender interface { - Link + Endpoint + LinkSettings // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. // Returns an Outcome, which may contain an error if the message could not be sent. @@ -83,6 +84,12 @@ type Outcome struct { Value interface{} } +func (o Outcome) send(ack chan<- Outcome) { + if ack != nil { + ack <- o + } +} + // SentStatus indicates the status of a sent message. type SentStatus int @@ -144,44 +151,37 @@ type sender struct { func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { // wait for credit if _, err := timedReceive(s.credit, t); err != nil { - if err == Closed && s.Error != nil { + if err == Closed && s.Error() != nil { err = s.Error() } - ack <- Outcome{Unsent, err, v} + Outcome{Unsent, err, v}.send(ack) return } // Send a message in handler goroutine err := s.engine().Inject(func() { if s.Error() != nil { - if ack != nil { - ack <- Outcome{Unsent, s.Error(), v} - } + Outcome{Unsent, s.Error(), v}.send(ack) return } - if delivery, err := s.eLink.Send(m); err == nil { - if ack != nil { // We must report an outcome - if s.SndSettle() == SndSettled { - delivery.Settle() // Pre-settle if required - ack <- Outcome{Accepted, nil, v} - } else { - s.handler().sentMessages[delivery] = sentMessage{ack, v} - } - } else { // ack == nil, can't report outcome - if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to. - delivery.Settle() - } - } - } else { // err != nil - if ack != nil { - ack <- Outcome{Unsent, err, v} + + delivery, err2 := s.eLink.Send(m) + switch { + case err2 != nil: + Outcome{Unsent, err2, v}.send(ack) + case ack == nil || s.SndSettle() == SndSettled: // Pre-settled + if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy + delivery.Settle() } + Outcome{Accepted, nil, v}.send(ack) // Assume accepted + default: + s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler } if s.eLink.Credit() > 0 { // Signal there is still credit s.sendable() } }) - if err != nil && ack != nil { - ack <- Outcome{Unsent, err, v} + if err != nil { + Outcome{Unsent, err, v}.send(ack) } } @@ -237,15 +237,16 @@ func (s *sender) SendSync(m amqp.Message) Outcome { } // handler goroutine -func (s *sender) closed(err error) { - s.link.closed(err) +func (s *sender) closed(err error) error { close(s.credit) + return s.link.closed(err) } -func newSender(l link) *sender { - s := &sender{link: l, credit: make(chan struct{}, 1)} +func newSender(ls linkSettings) *sender { + s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)} + s.endpoint.init(s.link.eLink.String()) s.handler().addLink(s.eLink, s) - s.link.open() + s.link.eLink.Open() return s } @@ -263,7 +264,7 @@ type IncomingSender struct { // Accept accepts an incoming sender endpoint func (in *IncomingSender) Accept() Endpoint { - return in.accept(func() Endpoint { return newSender(in.link) }) + return in.accept(func() Endpoint { return newSender(in.linkSettings) }) } // Call in injected functions to check if the sender is valid. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/electron/session.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go index 18d8bc8..1bbc52c 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go @@ -53,8 +53,8 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption) *ses s := &session{ connection: c, eSession: es, - endpoint: makeEndpoint(es.String()), } + s.endpoint.init(es.String()) for _, set := range setting { set(s) } @@ -81,7 +81,7 @@ func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) { if s.Error() != nil { return s.Error() } - l, err := localLink(s, true, setting...) + l, err := makeLocalLink(s, true, setting...) if err == nil { snd = newSender(l) } @@ -95,7 +95,7 @@ func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) { if s.Error() != nil { return s.Error() } - l, err := localLink(s, false, setting...) + l, err := makeLocalLink(s, false, setting...) if err == nil { rcv = newReceiver(l) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go index 13d44b8..eecda7a 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go @@ -33,9 +33,9 @@ import "C" import ( "fmt" - "io" "net" "sync" + "time" "unsafe" ) @@ -172,7 +172,7 @@ func (eng *Engine) String() string { } func (eng *Engine) Id() string { - return fmt.Sprintf("%eng", &eng) + return fmt.Sprintf("%p", eng) } func (eng *Engine) Error() error { @@ -223,19 +223,35 @@ func (eng *Engine) InjectWait(f func() error) error { // func (eng *Engine) Server() { eng.transport.SetServer() } -// Close the engine's connection, returns when the engine has exited. +func (eng *Engine) disconnect() { + eng.transport.CloseHead() + eng.transport.CloseTail() + eng.conn.Close() + eng.dispatch() +} + +// Close the engine's connection. +// If err != nil pass it to the remote end as the close condition. +// Returns when the remote end closes or disconnects. func (eng *Engine) Close(err error) { - eng.err.Set(err) - eng.Inject(func() { - CloseError(eng.connection, err) - }) + eng.Inject(func() { CloseError(eng.connection, err) }) <-eng.running } -// Disconnect the engine's connection without and AMQP close, returns when the engine has exited. +// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout. +func (eng *Engine) CloseTimeout(err error, timeout time.Duration) { + eng.Inject(func() { CloseError(eng.connection, err) }) + select { + case <-eng.running: + case <-time.After(timeout): + eng.Disconnect(err) + } +} + +// Disconnect the engine's connection immediately without an AMQP close. +// Process any termination events before returning. func (eng *Engine) Disconnect(err error) { - eng.err.Set(err) - eng.conn.Close() + eng.Inject(func() { eng.transport.Condition().SetError(err); eng.disconnect() }) <-eng.running } @@ -283,7 +299,7 @@ func (eng *Engine) Run() error { wbuf := eng.write.buffer()[:0] - for eng.err.Get() == nil { + for !eng.transport.Closed() { if len(wbuf) == 0 { eng.pop(&wbuf) } @@ -305,12 +321,19 @@ func (eng *Engine) Run() error { f() } case err := <-readErr: - eng.netError(err) + eng.transport.Condition().SetError(err) + eng.transport.CloseTail() case err := <-writeErr: - eng.netError(err) + eng.transport.Condition().SetError(err) + eng.transport.CloseHead() + } + eng.dispatch() + if eng.connection.State().RemoteClosed() && eng.connection.State().LocalClosed() { + eng.disconnect() } - eng.process() } + eng.err.Set(EndpointError(eng.connection)) + eng.err.Set(eng.transport.Condition().Error()) close(eng.write.buffers) eng.conn.Close() // Make sure connection is closed wait.Wait() @@ -334,12 +357,6 @@ func (eng *Engine) Run() error { return eng.err.Get() } -func (eng *Engine) netError(err error) { - eng.err.Set(err) - eng.transport.CloseHead() - eng.transport.CloseTail() -} - func minInt(a, b int) int { if a < b { return a @@ -378,18 +395,13 @@ func (eng *Engine) push(buf []byte) { } } -func (eng *Engine) handle(e Event) { - for _, h := range eng.handlers { - h.HandleEvent(e) - } - if e.Type() == ETransportClosed { - eng.err.Set(io.EOF) - } -} +func (eng *Engine) peek() *C.pn_event_t { return C.pn_collector_peek(eng.collector) } -func (eng *Engine) process() { - for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) { - eng.handle(makeEvent(ce, eng)) +func (eng *Engine) dispatch() { + for ce := eng.peek(); ce != nil; ce = eng.peek() { + for _, h := range eng.handlers { + h.HandleEvent(makeEvent(ce, eng)) + } C.pn_collector_pop(eng.collector) } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7486d5b8/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go index cd547ed..70611d3 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go @@ -334,7 +334,7 @@ func (c Condition) Error() error { if c.IsNil() || !c.IsSet() { return nil } - return amqp.Error{c.Name(), c.Description()} + return amqp.Error{Name: c.Name(), Description: c.Description()} } // Set a Go error into a condition. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
