Merge branch 'master' into go1, pick up minor fixes.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5e6024bb Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5e6024bb Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5e6024bb Branch: refs/heads/go1 Commit: 5e6024bbff8c77f8734f99cdd808ffd34c36f719 Parents: c87499a 7486d5b Author: Alan Conway <[email protected]> Authored: Thu Jan 7 17:28:32 2016 -0500 Committer: Alan Conway <[email protected]> Committed: Thu Jan 7 17:28:32 2016 -0500 ---------------------------------------------------------------------- electron/connection.go | 12 +++- electron/endpoint.go | 5 +- electron/handler.go | 6 +- electron/link.go | 90 +++++++++++--------------- electron/messaging_test.go | 114 +++++++++++++++++++------------- electron/receiver.go | 139 +++++++++++++++++----------------------- electron/sender.go | 61 +++++++++--------- electron/session.go | 6 +- proton/engine.go | 74 ++++++++++++--------- proton/wrappers.go | 2 +- 10 files changed, 264 insertions(+), 245 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/connection.go ---------------------------------------------------------------------- diff --cc electron/connection.go index 386875d,0000000..1f8bd40 mode 100644,000000..100644 --- a/electron/connection.go +++ b/electron/connection.go @@@ -1,240 -1,0 +1,246 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +// #include <proton/disposition.h> +import "C" + +import ( + "fmt" + "net" + "qpid.apache.org/proton" + "sync" + "time" +) + +// Connection is an AMQP connection, created by a Container. +type Connection interface { + Endpoint + + // Sender opens a new sender on the DefaultSession. + Sender(...LinkOption) (Sender, error) + + // Receiver opens a new Receiver on the DefaultSession(). + Receiver(...LinkOption) (Receiver, error) + + // DefaultSession() returns a default session for the connection. It is opened + // on the first call to DefaultSession and returned on subsequent calls. + DefaultSession() (Session, error) + + // Session opens a new session. + Session(...SessionOption) (Session, error) + + // Container for the connection. + Container() Container + + // Disconnect the connection abruptly with an error. + Disconnect(error) + + // Wait waits for the connection to be disconnected. + Wait() error + + // WaitTimeout is like Wait but returns Timeout if the timeout expires. + WaitTimeout(time.Duration) error + + // Incoming returns a channel for incoming endpoints opened by the remote end. + // + // To enable, pass AllowIncoming() when creating the Connection. Otherwise all + // incoming endpoint requests are automatically rejected and Incoming() + // returns nil. + // + // An Incoming value can be an *IncomingSession, *IncomingSender or + // *IncomingReceiver. You must call Accept() to open the endpoint or Reject() + // to close it with an error. The specific Incoming types have additional + // methods to configure the endpoint. + // + // Not receiving from Incoming() or not calling Accept/Reject will block the + // electron event loop. Normally you would have a dedicated goroutine receive + // from Incoming() and start new goroutines to serve each incoming endpoint. + // The channel is closed when the Connection closes. + // + Incoming() <-chan Incoming +} + +// ConnectionOption can be passed when creating a connection to configure various options +type ConnectionOption func(*connection) + +// Server returns a ConnectionOption to put the connection in server mode. +// +// A server connection will do protocol negotiation to accept a incoming AMQP +// connection. Normally you would call this for a connection created by +// net.Listener.Accept() +// +func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } } + +// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests. +// See Connection.Incoming() +func AllowIncoming() ConnectionOption { + return func(c *connection) { c.incoming = make(chan Incoming) } +} + +type connection struct { + endpoint + defaultSessionOnce, closeOnce sync.Once + + container *container + conn net.Conn + incoming chan Incoming + handler *handler + engine *proton.Engine + eConnection proton.Connection + + defaultSession Session +} + +func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) { + c := &connection{container: cont, conn: conn} + c.handler = newHandler(c) + var err error + c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) + if err != nil { + return nil, err + } + for _, set := range setting { + set(c) + } - c.endpoint = makeEndpoint(c.engine.String()) ++ c.endpoint.init(c.engine.String()) + c.eConnection = c.engine.Connection() + go c.run() + return c, nil +} + +func (c *connection) run() { + c.engine.Run() + if c.incoming != nil { + close(c.incoming) + } + c.closed(Closed) +} + - func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) } ++func (c *connection) Close(err error) { ++ c.err.Set(err) ++ c.engine.Close(err) ++} + - func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) } ++func (c *connection) Disconnect(err error) { ++ c.err.Set(err) ++ c.engine.Disconnect(err) ++} + +func (c *connection) Session(setting ...SessionOption) (Session, error) { + var s Session + err := c.engine.InjectWait(func() error { + if c.Error() != nil { + return c.Error() + } + eSession, err := c.engine.Connection().Session() + if err == nil { + eSession.Open() + if err == nil { + s = newSession(c, eSession, setting...) + } + } + return err + }) + return s, err +} + +func (c *connection) Container() Container { return c.container } + +func (c *connection) DefaultSession() (s Session, err error) { + c.defaultSessionOnce.Do(func() { + c.defaultSession, err = c.Session() + }) + if err == nil { + err = c.Error() + } + return c.defaultSession, err +} + +func (c *connection) Sender(setting ...LinkOption) (Sender, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Sender(setting...) + } else { + return nil, err + } +} + +func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Receiver(setting...) + } else { + return nil, err + } +} + +func (c *connection) Connection() Connection { return c } + +func (c *connection) Wait() error { return c.WaitTimeout(Forever) } +func (c *connection) WaitTimeout(timeout time.Duration) error { + _, err := timedReceive(c.done, timeout) + if err == Timeout { + return Timeout + } + return c.Error() +} + +func (c *connection) Incoming() <-chan Incoming { return c.incoming } + +// Incoming is the interface for incoming requests to open an endpoint. +// Implementing types are IncomingSession, IncomingSender and IncomingReceiver. +type Incoming interface { + // Accept and open the endpoint. + Accept() Endpoint + + // Reject the endpoint with an error + Reject(error) + + // wait for and call the accept function, call in proton goroutine. + wait() error + pEndpoint() proton.Endpoint +} + +type incoming struct { + endpoint proton.Endpoint + acceptCh chan func() error +} + +func makeIncoming(e proton.Endpoint) incoming { + return incoming{endpoint: e, acceptCh: make(chan func() error)} +} + +func (in *incoming) String() string { return fmt.Sprintf("%s: %s", in.endpoint.Type(), in.endpoint) } +func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } } + +// Call in proton goroutine, wait for and call the accept function fr +func (in *incoming) wait() error { return (<-in.acceptCh)() } + +func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint } + +// Called in app goroutine to send an accept function to proton and return the resulting endpoint. +func (in *incoming) accept(f func() Endpoint) Endpoint { + done := make(chan Endpoint) + in.acceptCh <- func() error { + ep := f() + done <- ep + return nil + } + return <-done +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/endpoint.go ---------------------------------------------------------------------- diff --cc electron/endpoint.go index 2b1f62d,0000000..fc701c6 mode 100644,000000..100644 --- a/electron/endpoint.go +++ b/electron/endpoint.go @@@ -1,99 -1,0 +1,102 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "io" + "qpid.apache.org/proton" +) + +// Closed is an alias for io.EOF. It is returned as an error when an endpoint +// was closed cleanly. +var Closed = io.EOF + +// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver. +// +// Endpoints can be created locally or by the remote peer. You must Open() an +// endpoint before you can use it. Some endpoints have additional Set*() methods +// that must be called before Open() to take effect, see Connection, Session, +// Link, Sender and Receiver for details. +// +type Endpoint interface { + // Close an endpoint and signal an error to the remote end if error != nil. + Close(error) + + // String is a human readable identifier, useful for debugging and logging. + String() string + + // Error returns nil if the endpoint is open, otherwise returns an error. + // Error() == Closed means the endpoint was closed without error. + Error() error + + // Connection containing the endpoint + Connection() Connection + + // Done returns a channel that will close when the endpoint closes. + // Error() will contain the reason. + Done() <-chan struct{} ++ ++ // Called in handler goroutine when endpoint is remotely closed. ++ closed(err error) error +} + +// DEVELOPER NOTES +// +// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated +// +type endpoint struct { + err proton.ErrorHolder + str string // Must be set by the value that embeds endpoint. + done chan struct{} +} + - func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} } ++func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) } + +// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding +// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure +// the pointer has not been invalidated. +// +// Returns the error stored on the endpoint, which may not be different to err if there was +// already a n error +func (e *endpoint) closed(err error) error { + select { + case <-e.done: + // Already closed + default: + e.err.Set(err) + e.err.Set(Closed) + close(e.done) + } + return e.err.Get() +} + +func (e *endpoint) String() string { return e.str } + +func (e *endpoint) Error() error { return e.err.Get() } + +func (e *endpoint) Done() <-chan struct{} { return e.done } + +// Call in proton goroutine to initiate closing an endpoint locally +// handler will complete the close when remote end closes. +func localClose(ep proton.Endpoint, err error) { + if ep.State().LocalActive() { + proton.CloseError(ep, err) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/handler.go ---------------------------------------------------------------------- diff --cc electron/handler.go index 0237156,0000000..eb53df3 mode 100644,000000..100644 --- a/electron/handler.go +++ b/electron/handler.go @@@ -1,187 -1,0 +1,187 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "qpid.apache.org/amqp" + "qpid.apache.org/proton" +) + +// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated. + +type handler struct { + delegator *proton.MessagingAdapter + connection *connection - links map[proton.Link]Link ++ links map[proton.Link]Endpoint + sentMessages map[proton.Delivery]sentMessage + sessions map[proton.Session]*session +} + +func newHandler(c *connection) *handler { + h := &handler{ + connection: c, - links: make(map[proton.Link]Link), ++ links: make(map[proton.Link]Endpoint), + sentMessages: make(map[proton.Delivery]sentMessage), + sessions: make(map[proton.Session]*session), + } + h.delegator = proton.NewMessagingAdapter(h) + // Disable auto features of MessagingAdapter, we do these ourselves. + h.delegator.Prefetch = 0 + h.delegator.AutoAccept = false + h.delegator.AutoSettle = false + h.delegator.AutoOpen = false + return h +} + +func (h *handler) linkError(l proton.Link, msg string) { + proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l)) +} + +func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { + switch t { + + case proton.MMessage: + if r, ok := h.links[e.Link()].(*receiver); ok { + r.message(e.Delivery()) + } else { + h.linkError(e.Link(), "no receiver") + } + + case proton.MSettled: + if sm, ok := h.sentMessages[e.Delivery()]; ok { + d := e.Delivery().Remote() + sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value} + delete(h.sentMessages, e.Delivery()) + } + + case proton.MSendable: + if s, ok := h.links[e.Link()].(*sender); ok { + s.sendable() + } else { + h.linkError(e.Link(), "no sender") + } + + case proton.MSessionOpening: + if e.Session().State().LocalUninit() { // Remotely opened + h.incoming(newIncomingSession(h, e.Session())) + } + + case proton.MSessionClosed: + h.sessionClosed(e.Session(), proton.EndpointError(e.Session())) + + case proton.MLinkOpening: + l := e.Link() + if l.State().LocalActive() { // Already opened locally. + break + } + ss := h.sessions[l.Session()] + if ss == nil { + h.linkError(e.Link(), "no session") + break + } + if l.IsReceiver() { + h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)}) + } else { + h.incoming(&IncomingSender{makeIncomingLink(ss, l)}) + } + + case proton.MLinkClosing: + e.Link().Close() + + case proton.MLinkClosed: + h.linkClosed(e.Link(), proton.EndpointError(e.Link())) + + case proton.MConnectionClosing: + h.connection.err.Set(e.Connection().RemoteCondition().Error()) + + case proton.MConnectionClosed: + h.connectionClosed(proton.EndpointError(e.Connection())) + + case proton.MDisconnected: + h.connection.err.Set(e.Transport().Condition().Error()) + // If err not set at this point (e.g. to Closed) then this is unexpected. + h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)) + + err := h.connection.Error() + + for l, _ := range h.links { + h.linkClosed(l, err) + } + h.links = nil + for _, s := range h.sessions { + s.closed(err) + } + h.sessions = nil + for _, sm := range h.sentMessages { + sm.ack <- Outcome{Unacknowledged, err, sm.value} + } + h.sentMessages = nil + } +} + +func (h *handler) incoming(in Incoming) { + var err error + if h.connection.incoming != nil { + h.connection.incoming <- in + err = in.wait() + } else { + err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s", + in.pEndpoint().Type(), in.pEndpoint().String()) + } + if err == nil { + in.pEndpoint().Open() + } else { + proton.CloseError(in.pEndpoint(), err) + } +} + - func (h *handler) addLink(pl proton.Link, el Link) { ++func (h *handler) addLink(pl proton.Link, el Endpoint) { + h.links[pl] = el +} + +func (h *handler) linkClosed(l proton.Link, err error) { + if link, ok := h.links[l]; ok { + link.closed(err) + delete(h.links, l) + } +} + +func (h *handler) sessionClosed(ps proton.Session, err error) { + if s, ok := h.sessions[ps]; ok { + delete(h.sessions, ps) + err = s.closed(err) + for l, _ := range h.links { + if l.Session() == ps { + h.linkClosed(l, err) + } + } + } +} + +func (h *handler) connectionClosed(err error) { + err = h.connection.closed(err) + // Close links first to avoid repeated scans of the link list by sessions. + for l, _ := range h.links { + h.linkClosed(l, err) + } + for s, _ := range h.sessions { + h.sessionClosed(s, err) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/link.go ---------------------------------------------------------------------- diff --cc electron/link.go index 91efa8e,0000000..80b4d5c mode 100644,000000..100644 --- a/electron/link.go +++ b/electron/link.go @@@ -1,245 -1,0 +1,233 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "fmt" + "qpid.apache.org/proton" +) + - // Link is the common interface for AMQP links. Sender and Receiver provide - // more methods for the sending or receiving end of a link respectively. - type Link interface { - Endpoint - ++// Settings associated with a link ++type LinkSettings interface { + // Source address that messages are coming from. + Source() string + + // Target address that messages are going to. + Target() string + + // Name is a unique name for the link among links between the same + // containers in the same direction. By default generated automatically. + LinkName() string + + // IsSender is true if this is the sending end of the link. + IsSender() bool + + // IsReceiver is true if this is the receiving end of the link. + IsReceiver() bool + + // SndSettle defines when the sending end of the link settles message delivery. + SndSettle() SndSettleMode + + // RcvSettle defines when the sending end of the link settles message delivery. + RcvSettle() RcvSettleMode + + // Session containing the Link + Session() Session - - // Called in event loop on closed event. - closed(err error) - // Called to open a link (local or accepted incoming link) - open() +} + +// LinkOption can be passed when creating a sender or receiver link to set optional configuration. - type LinkOption func(*link) ++type LinkOption func(*linkSettings) + +// Source returns a LinkOption that sets address that messages are coming from. - func Source(s string) LinkOption { return func(l *link) { l.source = s } } ++func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s } } + +// Target returns a LinkOption that sets address that messages are going to. - func Target(s string) LinkOption { return func(l *link) { l.target = s } } ++func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } } + +// LinkName returns a LinkOption that sets the link name. - func LinkName(s string) LinkOption { return func(l *link) { l.target = s } } ++func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = s } } + +// SndSettle returns a LinkOption that sets the send settle mode - func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } } ++func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } } + +// RcvSettle returns a LinkOption that sets the send settle mode - func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } } ++func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } } + +// SndSettleMode returns a LinkOption that defines when the sending end of the +// link settles message delivery. +type SndSettleMode proton.SndSettleMode + +// Capacity returns a LinkOption that sets the link capacity - func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } } ++func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } } + +// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender. - func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } } ++func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } } + +// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages +// are sent but no acknowledgment is received, messages can be lost if there is +// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst +func AtMostOnce() LinkOption { - return func(l *link) { ++ return func(l *linkSettings) { + SndSettle(SndSettled)(l) + RcvSettle(RcvFirst)(l) + } +} + +// AtLeastOnce returns a LinkOption that requests acknowledgment for every +// message, acknowledgment indicates the message was definitely received. In the +// event of a failure, unacknowledged messages can be re-sent but there is a +// chance that the message will be received twice in this case. Sets +// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst +func AtLeastOnce() LinkOption { - return func(l *link) { ++ return func(l *linkSettings) { + SndSettle(SndUnsettled)(l) + RcvSettle(RcvFirst)(l) + } +} + +const ( + // Messages are sent unsettled + SndUnsettled = SndSettleMode(proton.SndUnsettled) + // Messages are sent already settled + SndSettled = SndSettleMode(proton.SndSettled) + // Sender can send either unsettled or settled messages. + SendMixed = SndSettleMode(proton.SndMixed) +) + +// RcvSettleMode defines when the receiving end of the link settles message delivery. +type RcvSettleMode proton.RcvSettleMode + +const ( + // Receiver settles first. + RcvFirst = RcvSettleMode(proton.RcvFirst) + // Receiver waits for sender to settle before settling. + RcvSecond = RcvSettleMode(proton.RcvSecond) +) + - type link struct { - endpoint - - // Link settings. ++type linkSettings struct { + source string + target string + linkName string + isSender bool + sndSettle SndSettleMode + rcvSettle RcvSettleMode + capacity int + prefetch bool ++ session *session ++ eLink proton.Link ++} + - session *session - eLink proton.Link ++type link struct { ++ endpoint ++ linkSettings +} + - func (l *link) Source() string { return l.source } - func (l *link) Target() string { return l.target } - func (l *link) LinkName() string { return l.linkName } - func (l *link) IsSender() bool { return l.isSender } - func (l *link) IsReceiver() bool { return !l.isSender } - func (l *link) SndSettle() SndSettleMode { return l.sndSettle } - func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle } - func (l *link) Session() Session { return l.session } - func (l *link) Connection() Connection { return l.session.Connection() } ++func (l *linkSettings) Source() string { return l.source } ++func (l *linkSettings) Target() string { return l.target } ++func (l *linkSettings) LinkName() string { return l.linkName } ++func (l *linkSettings) IsSender() bool { return l.isSender } ++func (l *linkSettings) IsReceiver() bool { return !l.isSender } ++func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle } ++func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle } + ++func (l *link) Session() Session { return l.session } ++func (l *link) Connection() Connection { return l.session.Connection() } +func (l *link) engine() *proton.Engine { return l.session.connection.engine } +func (l *link) handler() *handler { return l.session.connection.handler } + - // Set up link fields and open the proton.Link - func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) { - l := link{ - session: sn, ++// Open a link and return the linkSettings. ++func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSettings, error) { ++ l := linkSettings{ + isSender: isSender, + capacity: 1, + prefetch: false, ++ session: sn, + } + for _, set := range setting { + set(&l) + } + if l.linkName == "" { + l.linkName = l.session.connection.container.nextLinkName() + } + if l.IsSender() { + l.eLink = l.session.eSession.Sender(l.linkName) + } else { + l.eLink = l.session.eSession.Receiver(l.linkName) + } + if l.eLink.IsNil() { - l.err.Set(fmt.Errorf("cannot create link %s", l)) - return l, l.err.Get() ++ return l, fmt.Errorf("cannot create link %s", l.eLink) + } + l.eLink.Source().SetAddress(l.source) + l.eLink.Target().SetAddress(l.target) + l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle)) + l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle)) + l.eLink.Open() - l.endpoint = makeEndpoint(l.eLink.String()) + return l, nil +} + +type incomingLink struct { + incoming - link ++ linkSettings ++ eLink proton.Link ++ sn *session +} + +// Set up a link from an incoming proton.Link. +func makeIncomingLink(sn *session, eLink proton.Link) incomingLink { + l := incomingLink{ + incoming: makeIncoming(eLink), - link: link{ - session: sn, ++ linkSettings: linkSettings{ + isSender: eLink.IsSender(), - eLink: eLink, + source: eLink.RemoteSource().Address(), + target: eLink.RemoteTarget().Address(), + linkName: eLink.Name(), + sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()), + rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()), + capacity: 1, + prefetch: false, - endpoint: makeEndpoint(eLink.String()), ++ eLink: eLink, ++ session: sn, + }, + } + return l +} + +// Not part of Link interface but use by Sender and Receiver. +func (l *link) Credit() (credit int, err error) { + err = l.engine().InjectWait(func() error { + if l.Error() != nil { + return l.Error() + } + credit = l.eLink.Credit() + return nil + }) + return +} + +// Not part of Link interface but use by Sender and Receiver. +func (l *link) Capacity() int { return l.capacity } + +func (l *link) Close(err error) { + l.engine().Inject(func() { + if l.Error() == nil { + localClose(l.eLink, err) + } + }) +} - - func (l *link) open() { - l.eLink.Open() - } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/messaging_test.go ---------------------------------------------------------------------- diff --cc electron/messaging_test.go index 5af57e8,0000000..0de7d16 mode 100644,000000..100644 --- a/electron/messaging_test.go +++ b/electron/messaging_test.go @@@ -1,426 -1,0 +1,454 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "fmt" + "net" + "path" + "qpid.apache.org/amqp" + "runtime" + "testing" + "time" +) + +func fatalIf(t *testing.T, err error) { + if err != nil { + _, file, line, ok := runtime.Caller(1) // annotate with location of caller. + if ok { + _, file = path.Split(file) + } + t.Fatalf("(from %s:%d) %v", file, line, err) + } +} + +// Start a server, return listening addr and channel for incoming Connections. +func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) { + listener, err := net.Listen("tcp", "") + fatalIf(t, err) + addr := listener.Addr() + ch := make(chan Connection) + go func() { + conn, err := listener.Accept() + c, err := cont.Connection(conn, Server(), AllowIncoming()) + fatalIf(t, err) + ch <- c + }() + return addr, ch +} + +// Open a client connection and session, return the session. +func newClient(t *testing.T, cont Container, addr net.Addr) Session { + conn, err := net.Dial(addr.Network(), addr.String()) + fatalIf(t, err) + c, err := cont.Connection(conn) + fatalIf(t, err) + sn, err := c.Session() + fatalIf(t, err) + return sn +} + +// Return client and server ends of the same connection. +func newClientServer(t *testing.T) (client Session, server Connection) { + addr, ch := newServer(t, NewContainer("test-server")) + client = newClient(t, NewContainer("test-client"), addr) + return client, <-ch +} + +// Close client and server +func closeClientServer(client Session, server Connection) { + client.Connection().Close(nil) + server.Close(nil) +} + +// Send a message one way with a client sender and server receiver, verify ack. +func TestClientSendServerReceive(t *testing.T) { + nLinks := 3 + nMessages := 3 + + rchan := make(chan Receiver, nLinks) + client, server := newClientServer(t) + go func() { + for in := range server.Incoming() { + switch in := in.(type) { + case *IncomingReceiver: + in.SetCapacity(1) + in.SetPrefetch(false) + rchan <- in.Accept().(Receiver) + default: + in.Accept() + } + } + }() + + defer func() { closeClientServer(client, server) }() + + s := make([]Sender, nLinks) + for i := 0; i < nLinks; i++ { + var err error + s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i))) + if err != nil { + t.Fatal(err) + } + } + r := make([]Receiver, nLinks) + for i := 0; i < nLinks; i++ { + r[i] = <-rchan + } + + for i := 0; i < nLinks; i++ { + for j := 0; j < nMessages; j++ { + // Client send + ack := make(chan Outcome, 1) + sendDone := make(chan struct{}) + go func() { + defer close(sendDone) + m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j)) + var err error + s[i].SendAsync(m, ack, "testing") + if err != nil { + t.Fatal(err) + } + }() + + // Server recieve + rm, err := r[i].Receive() + if err != nil { + t.Fatal(err) + } + if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got { + t.Errorf("%#v != %#v", want, got) + } + + // Should not be acknowledged on client yet + <-sendDone + select { + case <-ack: + t.Errorf("unexpected ack") + default: + } + + // Server send ack + if err := rm.Reject(); err != nil { + t.Error(err) + } + // Client get ack. + if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected { + t.Error("unexpected ack: ", a.Status, a.Error, a.Value) + } + } + } +} + +func TestClientReceiver(t *testing.T) { + nMessages := 3 + client, server := newClientServer(t) + go func() { + for in := range server.Incoming() { + switch in := in.(type) { + case *IncomingSender: + s := in.Accept().(Sender) + go func() { + for i := int32(0); i < int32(nMessages); i++ { + out := s.SendSync(amqp.NewMessageWith(i)) + if out.Error != nil { + t.Error(out.Error) + return + } + } + s.Close(nil) + }() + default: + in.Accept() + } + } + }() + + r, err := client.Receiver(Source("foo")) + if err != nil { + t.Fatal(err) + } + for i := int32(0); i < int32(nMessages); i++ { + rm, err := r.Receive() + if err != nil { + if err != Closed { + t.Error(err) + } + break + } + if err := rm.Accept(); err != nil { + t.Error(err) + } + if b, ok := rm.Message.Body().(int32); !ok || b != i { + t.Errorf("want %v, true got %v, %v", i, b, ok) + } + } + server.Close(nil) + client.Connection().Close(nil) +} + +// Test timeout versions of waiting functions. +func TestTimeouts(t *testing.T) { + var err error + rchan := make(chan Receiver, 1) + client, server := newClientServer(t) + go func() { + for i := range server.Incoming() { + switch i := i.(type) { + case *IncomingReceiver: + i.SetCapacity(1) + i.SetPrefetch(false) + rchan <- i.Accept().(Receiver) // Issue credit only on receive + default: + i.Accept() + } + } + }() + defer func() { closeClientServer(client, server) }() + + // Open client sender + snd, err := client.Sender(Target("test")) + if err != nil { + t.Fatal(err) + } + rcv := <-rchan + + // Test send with timeout + short := time.Millisecond + long := time.Second + m := amqp.NewMessage() + if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } + if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } + // Test receive with timeout + if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } + // Test receive with timeout + if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout. + t.Error("want Timeout got", err) + } + // There is now a credit on the link due to receive + ack := make(chan Outcome) + snd.SendAsyncTimeout(m, ack, nil, short) + // Disposition should timeout + select { + case <-ack: + t.Errorf("want Timeout got %#v", ack) + case <-time.After(short): + } + + // Receive and accept + rm, err := rcv.ReceiveTimeout(long) + if err != nil { + t.Fatal(err) + } + rm.Accept() + // Sender get ack + if a := <-ack; a.Status != Accepted || a.Error != nil { + t.Errorf("want (accepted, nil) got %#v", a) + } +} + - // clientServer that returns sender/receiver pairs at opposite ends of link. ++// A server that returns the opposite end of each client link via channels. +type pairs struct { - t *testing.T - client Session - server Connection - rchan chan Receiver - schan chan Sender ++ t *testing.T ++ client Session ++ server Connection ++ rchan chan Receiver ++ schan chan Sender ++ capacity int ++ prefetch bool +} + - func newPairs(t *testing.T) *pairs { ++func newPairs(t *testing.T, capacity int, prefetch bool) *pairs { + p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)} + p.client, p.server = newClientServer(t) + go func() { + for i := range p.server.Incoming() { + switch i := i.(type) { + case *IncomingReceiver: - i.SetCapacity(1) - i.SetPrefetch(false) ++ i.SetCapacity(capacity) ++ i.SetPrefetch(prefetch) + p.rchan <- i.Accept().(Receiver) + case *IncomingSender: + p.schan <- i.Accept().(Sender) + default: + i.Accept() + } + } + }() + return p +} + +func (p *pairs) close() { + closeClientServer(p.client, p.server) +} + ++// Return a client sender and server receiver +func (p *pairs) senderReceiver() (Sender, Receiver) { + snd, err := p.client.Sender() + fatalIf(p.t, err) + rcv := <-p.rchan + return snd, rcv +} + ++// Return a client receiver and server sender +func (p *pairs) receiverSender() (Receiver, Sender) { + rcv, err := p.client.Receiver() + fatalIf(p.t, err) + snd := <-p.schan + return rcv, snd +} + +type result struct { + label string + err error +} + - func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) } ++func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) } + +func doSend(snd Sender, results chan result) { + err := snd.SendSync(amqp.NewMessage()).Error + results <- result{"send", err} +} + +func doReceive(rcv Receiver, results chan result) { + _, err := rcv.Receive() + results <- result{"receive", err} +} + +func doDisposition(ack <-chan Outcome, results chan result) { + results <- result{"disposition", (<-ack).Error} +} + ++// Senders get credit immediately if receivers have prefetch set ++func TestSendReceivePrefetch(t *testing.T) { ++ pairs := newPairs(t, 1, true) ++ s, r := pairs.senderReceiver() ++ s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit. ++ if _, err := r.Receive(); err != nil { ++ t.Error(err) ++ } ++} ++ ++// Senders do not get credit till Receive() if receivers don't have prefetch ++func TestSendReceiveNoPrefetch(t *testing.T) { ++ pairs := newPairs(t, 1, false) ++ s, r := pairs.senderReceiver() ++ done := make(chan struct{}, 1) ++ go func() { ++ s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit. ++ close(done) ++ }() ++ select { ++ case <-done: ++ t.Errorf("send should be blocked on credit") ++ default: ++ if _, err := r.Receive(); err != nil { ++ t.Error(err) ++ } else { ++ <-done ++ } // Should be unblocked now ++ } ++} ++ +// Test that closing Links interrupts blocked link functions. +func TestLinkCloseInterrupt(t *testing.T) { - want := amqp.Errorf("x", "all bad") - pairs := newPairs(t) ++ want := amqp.Error{Name: "x", Description: "all bad"} ++ pairs := newPairs(t, 1, false) + results := make(chan result) // Collect expected errors + - // Sender.Close() interrupts Send() - snd, rcv := pairs.senderReceiver() - go doSend(snd, results) - snd.Close(want) - if r := <-results; want != r.err { - t.Errorf("want %#v got %#v", want, r) - } - - // Remote Receiver.Close() interrupts Send() - snd, rcv = pairs.senderReceiver() - go doSend(snd, results) - rcv.Close(want) - if r := <-results; want != r.err { - t.Errorf("want %#v got %#v", want, r) - } ++ // Note closing the link does not interrupt Send() calls, the AMQP spec says ++ // that deliveries can be settled after the link is closed. + + // Receiver.Close() interrupts Receive() - snd, rcv = pairs.senderReceiver() ++ snd, rcv := pairs.senderReceiver() + go doReceive(rcv, results) + rcv.Close(want) + if r := <-results; want != r.err { + t.Errorf("want %#v got %#v", want, r) + } + + // Remote Sender.Close() interrupts Receive() + snd, rcv = pairs.senderReceiver() + go doReceive(rcv, results) + snd.Close(want) + if r := <-results; want != r.err { + t.Errorf("want %#v got %#v", want, r) + } +} + +// Test closing the server end of a connection. +func TestConnectionCloseInterrupt1(t *testing.T) { - want := amqp.Errorf("x", "bad") - pairs := newPairs(t) ++ want := amqp.Error{Name: "x", Description: "bad"} ++ pairs := newPairs(t, 1, true) + results := make(chan result) // Collect expected errors + + // Connection.Close() interrupts Send, Receive, Disposition. + snd, rcv := pairs.senderReceiver() - go doReceive(rcv, results) - ack := snd.SendWaitable(amqp.NewMessage()) - go doDisposition(ack, results) - snd, rcv = pairs.senderReceiver() + go doSend(snd, results) ++ ++ rcv.Receive() + rcv, snd = pairs.receiverSender() + go doReceive(rcv, results) ++ ++ snd, rcv = pairs.senderReceiver() ++ ack := snd.SendWaitable(amqp.NewMessage()) ++ rcv.Receive() ++ go doDisposition(ack, results) ++ + pairs.server.Close(want) + for i := 0; i < 3; i++ { + if r := <-results; want != r.err { - // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF. - t.Logf("want %v got %v", want, r.err) ++ t.Logf("want %v got %v", want, r) + } + } +} + +// Test closing the client end of the connection. +func TestConnectionCloseInterrupt2(t *testing.T) { - want := amqp.Errorf("x", "bad") - pairs := newPairs(t) ++ want := amqp.Error{Name: "x", Description: "bad"} ++ pairs := newPairs(t, 1, true) + results := make(chan result) // Collect expected errors + + // Connection.Close() interrupts Send, Receive, Disposition. + snd, rcv := pairs.senderReceiver() - go doReceive(rcv, results) - ack := snd.SendWaitable(amqp.NewMessage()) - go doDisposition(ack, results) - snd, rcv = pairs.senderReceiver() + go doSend(snd, results) ++ rcv.Receive() ++ + rcv, snd = pairs.receiverSender() + go doReceive(rcv, results) - pairs.client.Close(want) ++ ++ snd, rcv = pairs.senderReceiver() ++ ack := snd.SendWaitable(amqp.NewMessage()) ++ go doDisposition(ack, results) ++ ++ pairs.client.Connection().Close(want) + for i := 0; i < 3; i++ { + if r := <-results; want != r.err { + // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil. + t.Logf("want %v got %v", want, r.err) + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/receiver.go ---------------------------------------------------------------------- diff --cc electron/receiver.go index 22bdc7e,0000000..f2b7a52 mode 100644,000000..100644 --- a/electron/receiver.go +++ b/electron/receiver.go @@@ -1,244 -1,0 +1,225 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "fmt" + "qpid.apache.org/amqp" + "qpid.apache.org/proton" + "time" +) + +// Receiver is a Link that receives messages. +// +type Receiver interface { - Link ++ Endpoint ++ LinkSettings + + // Receive blocks until a message is available or until the Receiver is closed + // and has no more buffered messages. + Receive() (ReceivedMessage, error) + + // ReceiveTimeout is like Receive but gives up after timeout, see Timeout. + // + // Note that that if Prefetch is false, after a Timeout the credit issued by + // Receive remains on the link. It will be used by the next call to Receive. + ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) + + // Prefetch==true means the Receiver will automatically issue credit to the + // remote sender to keep its buffer as full as possible, i.e. it will + // "pre-fetch" messages independently of the application calling + // Receive(). This gives good throughput for applications that handle a + // continuous stream of messages. Larger capacity may improve throughput, the + // optimal value depends on the characteristics of your application. + // + // Prefetch==false means the Receiver will issue only issue credit when you + // call Receive(), and will only issue enough credit to satisfy the calls + // actually made. This gives lower throughput but will not fetch any messages + // in advance. It is good for synchronous applications that need to evaluate + // each message before deciding whether to receive another. The + // request-response pattern is a typical example. If you make concurrent + // calls to Receive with pre-fetch disabled, you can improve performance by + // setting the capacity close to the expected number of concurrent calls. + // + Prefetch() bool + + // Capacity is the size (number of messages) of the local message buffer + // These are messages received but not yet returned to the application by a call to Receive() + Capacity() int +} + - // Flow control policy for a receiver. - type policy interface { - // Called at the start of Receive() to adjust credit before fetching a message. - Pre(*receiver) - // Called after Receive() has received a message from Buffer() before it returns. - // Non-nil error means no message was received because of an error. - Post(*receiver, error) - } - - type prefetchPolicy struct{} - - func (p prefetchPolicy) Flow(r *receiver) { - r.engine().Inject(func() { - if r.Error() != nil { - return - } - _, _, max := r.credit() - if max > 0 { - r.eLink.Flow(max) - } - }) - } - func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) } - func (p prefetchPolicy) Post(r *receiver, err error) { - if err == nil { - p.Flow(r) - } - } - - type noPrefetchPolicy struct{ waiting int } - - func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine - r.engine().Inject(func() { - if r.Error() != nil { - return - } - len, credit, max := r.credit() - add := p.waiting - (len + credit) - if add > max { - add = max // Don't overflow - } - if add > 0 { - r.eLink.Flow(add) - } - }) - } - func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) } - func (p noPrefetchPolicy) Post(r *receiver, err error) { - p.waiting-- - if err == nil { - p.Flow(r) - } - } - +// Receiver implementation +type receiver struct { + link - buffer chan ReceivedMessage - policy policy ++ buffer chan ReceivedMessage ++ callers int +} + ++func (r *receiver) Capacity() int { return cap(r.buffer) } ++func (r *receiver) Prefetch() bool { return r.prefetch } ++ +// Call in proton goroutine - func newReceiver(l link) *receiver { - r := &receiver{link: l} ++func newReceiver(ls linkSettings) *receiver { ++ r := &receiver{link: link{linkSettings: ls}} ++ r.endpoint.init(r.link.eLink.String()) + if r.capacity < 1 { + r.capacity = 1 + } - if r.prefetch { - r.policy = &prefetchPolicy{} - } else { - r.policy = &noPrefetchPolicy{} - } + r.buffer = make(chan ReceivedMessage, r.capacity) + r.handler().addLink(r.eLink, r) - r.link.open() ++ r.link.eLink.Open() ++ if r.prefetch { ++ r.flow(r.maxFlow()) ++ } + return r +} + - // call in proton goroutine. - func (r *receiver) credit() (buffered, credit, max int) { - return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer) ++// Call in proton gorotine. Max additional credit we can request. ++func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - r.eLink.Credit() } ++ ++func (r *receiver) flow(credit int) { ++ if credit > 0 { ++ r.eLink.Flow(credit) ++ } +} + - func (r *receiver) Capacity() int { return cap(r.buffer) } - func (r *receiver) Prefetch() bool { return r.prefetch } ++// Inject flow check per-caller call when prefetch is off. ++// Called with inc=1 at start of call, inc = -1 at end ++func (r *receiver) caller(inc int) { ++ r.engine().Inject(func() { ++ r.callers += inc ++ need := r.callers - (len(r.buffer) + r.eLink.Credit()) ++ max := r.maxFlow() ++ if need > max { ++ need = max ++ } ++ r.flow(need) ++ }) ++} + ++// Inject flow top-up if prefetch is enabled ++func (r *receiver) flowTopUp() { ++ if r.prefetch { ++ r.engine().Inject(func() { r.flow(r.maxFlow()) }) ++ } ++} ++ ++// Not claled +func (r *receiver) Receive() (rm ReceivedMessage, err error) { + return r.ReceiveTimeout(Forever) +} + - func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) { ++func (r *receiver) ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) { + assert(r.buffer != nil, "Receiver is not open: %s", r) - r.policy.Pre(r) - defer func() { r.policy.Post(r, err) }() ++ select { // Check for immediate availability ++ case rm := <-r.buffer: ++ r.flowTopUp() ++ return rm, nil ++ default: ++ } ++ if !r.prefetch { // Per-caller flow control ++ r.caller(+1) ++ defer r.caller(-1) ++ } + rmi, err := timedReceive(r.buffer, timeout) + switch err { - case Timeout: - return ReceivedMessage{}, Timeout ++ case nil: ++ r.flowTopUp() ++ return rmi.(ReceivedMessage), err + case Closed: + return ReceivedMessage{}, r.Error() + default: - return rmi.(ReceivedMessage), nil ++ return ReceivedMessage{}, err + } +} + +// Called in proton goroutine on MMessage event. +func (r *receiver) message(delivery proton.Delivery) { + if r.eLink.State().RemoteClosed() { + localClose(r.eLink, r.eLink.RemoteCondition().Error()) + return + } + if delivery.HasMessage() { + m, err := delivery.Message() + if err != nil { + localClose(r.eLink, err) + return + } + assert(m != nil) + r.eLink.Advance() + if r.eLink.Credit() < 0 { + localClose(r.eLink, fmt.Errorf("received message in excess of credit limit")) + } else { + // We never issue more credit than cap(buffer) so this will not block. + r.buffer <- ReceivedMessage{m, delivery, r} + } + } +} + - func (r *receiver) closed(err error) { - r.link.closed(err) ++func (r *receiver) closed(err error) error { + if r.buffer != nil { + close(r.buffer) + } ++ return r.link.closed(err) +} + +// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged. +type ReceivedMessage struct { + // Message is the received message. + Message amqp.Message + + eDelivery proton.Delivery + receiver Receiver +} + +// Acknowledge a ReceivedMessage with the given delivery status. +func (rm *ReceivedMessage) acknowledge(status uint64) error { + return rm.receiver.(*receiver).engine().Inject(func() { + // Deliveries are valid as long as the connection is, unless settled. + rm.eDelivery.SettleAs(uint64(status)) + }) +} + +// Accept tells the sender that we take responsibility for processing the message. +func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) } + +// Reject tells the sender we consider the message invalid and unusable. +func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) } + +// Release tells the sender we will not process the message but some other +// receiver might. +func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) } + +// IncomingReceiver is sent on the Connection.Incoming() channel when there is +// an incoming request to open a receiver link. +type IncomingReceiver struct { + incomingLink +} + +// SetCapacity sets the capacity of the incoming receiver, call before Accept() +func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity } + +// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept() +func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch } + +// Accept accepts an incoming receiver endpoint +func (in *IncomingReceiver) Accept() Endpoint { - return in.accept(func() Endpoint { return newReceiver(in.link) }) ++ return in.accept(func() Endpoint { return newReceiver(in.linkSettings) }) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/sender.go ---------------------------------------------------------------------- diff --cc electron/sender.go index 834eb75,0000000..2f0e965 mode 100644,000000..100644 --- a/electron/sender.go +++ b/electron/sender.go @@@ -1,273 -1,0 +1,274 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +// #include <proton/disposition.h> +import "C" + +import ( + "fmt" + "qpid.apache.org/amqp" + "qpid.apache.org/proton" + "time" +) + +// Sender is a Link that sends messages. +// +// The result of sending a message is provided by an Outcome value. +// +// A sender can buffer messages up to the credit limit provided by the remote receiver. +// Send* methods will block if the buffer is full until there is space. +// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error. +// +type Sender interface { - Link ++ Endpoint ++ LinkSettings + + // SendSync sends a message and blocks until the message is acknowledged by the remote receiver. + // Returns an Outcome, which may contain an error if the message could not be sent. + SendSync(m amqp.Message) Outcome + + // SendWaitable puts a message in the send buffer and returns a channel that + // you can use to wait for the Outcome of just that message. The channel is + // buffered so you can receive from it whenever you want without blocking anything. + SendWaitable(m amqp.Message) <-chan Outcome + + // SendForget buffers a message for sending and returns, with no notification of the outcome. + SendForget(m amqp.Message) + + // SendAsync puts a message in the send buffer and returns immediately. An + // Outcome with Value = value will be sent to the ack channel when the remote + // receiver has acknowledged the message or if there is an error. + // + // You can use the same ack channel for many calls to SendAsync(), possibly on + // many Senders. The channel will receive the outcomes in the order they + // become available. The channel should be buffered and/or served by dedicated + // goroutines to avoid blocking the connection. + // + // If ack == nil no Outcome is sent. + SendAsync(m amqp.Message, ack chan<- Outcome, value interface{}) + + SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration) + + SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome + + SendForgetTimeout(m amqp.Message, timeout time.Duration) + + SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome +} + +// Outcome provides information about the outcome of sending a message. +type Outcome struct { + // Status of the message: was it sent, how was it acknowledged. + Status SentStatus + // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise. + Error error + // Value provided by the application in SendAsync() + Value interface{} +} + ++func (o Outcome) send(ack chan<- Outcome) { ++ if ack != nil { ++ ack <- o ++ } ++} ++ +// SentStatus indicates the status of a sent message. +type SentStatus int + +const ( + // Message was never sent + Unsent SentStatus = iota + // Message was sent but never acknowledged. It may or may not have been received. + Unacknowledged + // Message was accepted by the receiver (or was sent pre-settled, accept is assumed) + Accepted + // Message was rejected as invalid by the receiver + Rejected + // Message was not processed by the receiver but may be valid for a different receiver + Released + // Receiver responded with an unrecognized status. + Unknown +) + +// String human readable name for SentStatus. +func (s SentStatus) String() string { + switch s { + case Unsent: + return "unsent" + case Unacknowledged: + return "unacknowledged" + case Accepted: + return "accepted" + case Rejected: + return "rejected" + case Released: + return "released" + case Unknown: + return "unknown" + default: + return fmt.Sprintf("invalid(%d)", s) + } +} + +// Convert proton delivery state code to SentStatus value +func sentStatus(d uint64) SentStatus { + switch d { + case proton.Accepted: + return Accepted + case proton.Rejected: + return Rejected + case proton.Released, proton.Modified: + return Released + default: + return Unknown + } +} + +// Sender implementation, held by handler. +type sender struct { + link + credit chan struct{} // Signal available credit. +} + +func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) { + // wait for credit + if _, err := timedReceive(s.credit, t); err != nil { - if err == Closed && s.Error != nil { ++ if err == Closed && s.Error() != nil { + err = s.Error() + } - ack <- Outcome{Unsent, err, v} ++ Outcome{Unsent, err, v}.send(ack) + return + } + // Send a message in handler goroutine + err := s.engine().Inject(func() { + if s.Error() != nil { - if ack != nil { - ack <- Outcome{Unsent, s.Error(), v} - } ++ Outcome{Unsent, s.Error(), v}.send(ack) + return + } - if delivery, err := s.eLink.Send(m); err == nil { - if ack != nil { // We must report an outcome - if s.SndSettle() == SndSettled { - delivery.Settle() // Pre-settle if required - ack <- Outcome{Accepted, nil, v} - } else { - s.handler().sentMessages[delivery] = sentMessage{ack, v} - } - } else { // ack == nil, can't report outcome - if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to. - delivery.Settle() - } - } - } else { // err != nil - if ack != nil { - ack <- Outcome{Unsent, err, v} ++ ++ delivery, err2 := s.eLink.Send(m) ++ switch { ++ case err2 != nil: ++ Outcome{Unsent, err2, v}.send(ack) ++ case ack == nil || s.SndSettle() == SndSettled: // Pre-settled ++ if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy ++ delivery.Settle() + } ++ Outcome{Accepted, nil, v}.send(ack) // Assume accepted ++ default: ++ s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler + } + if s.eLink.Credit() > 0 { // Signal there is still credit + s.sendable() + } + }) - if err != nil && ack != nil { - ack <- Outcome{Unsent, err, v} ++ if err != nil { ++ Outcome{Unsent, err, v}.send(ack) + } +} + +// Set credit flag if not already set. Non-blocking, any goroutine +func (s *sender) sendable() { + select { // Non-blocking + case s.credit <- struct{}{}: + default: + } +} + +func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome { + out := make(chan Outcome, 1) + s.SendAsyncTimeout(m, out, nil, t) + return out +} + +func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) { + s.SendAsyncTimeout(m, nil, nil, t) +} + +func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome { + deadline := time.Now().Add(t) + ack := s.SendWaitableTimeout(m, t) + t = deadline.Sub(time.Now()) // Adjust for time already spent. + if t < 0 { + t = 0 + } + if out, err := timedReceive(ack, t); err == nil { + return out.(Outcome) + } else { + if err == Closed && s.Error() != nil { + err = s.Error() + } + return Outcome{Unacknowledged, err, nil} + } +} + +func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) { + s.SendAsyncTimeout(m, ack, v, Forever) +} + +func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome { + return s.SendWaitableTimeout(m, Forever) +} + +func (s *sender) SendForget(m amqp.Message) { + s.SendForgetTimeout(m, Forever) +} + +func (s *sender) SendSync(m amqp.Message) Outcome { + return <-s.SendWaitable(m) +} + +// handler goroutine - func (s *sender) closed(err error) { - s.link.closed(err) ++func (s *sender) closed(err error) error { + close(s.credit) ++ return s.link.closed(err) +} + - func newSender(l link) *sender { - s := &sender{link: l, credit: make(chan struct{}, 1)} ++func newSender(ls linkSettings) *sender { ++ s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)} ++ s.endpoint.init(s.link.eLink.String()) + s.handler().addLink(s.eLink, s) - s.link.open() ++ s.link.eLink.Open() + return s +} + +// sentMessage records a sent message on the handler. +type sentMessage struct { + ack chan<- Outcome + value interface{} +} + +// IncomingSender is sent on the Connection.Incoming() channel when there is +// an incoming request to open a sender link. +type IncomingSender struct { + incomingLink +} + +// Accept accepts an incoming sender endpoint +func (in *IncomingSender) Accept() Endpoint { - return in.accept(func() Endpoint { return newSender(in.link) }) ++ return in.accept(func() Endpoint { return newSender(in.linkSettings) }) +} + +// Call in injected functions to check if the sender is valid. +func (s *sender) valid() bool { + s2, ok := s.handler().links[s.eLink].(*sender) + return ok && s2 == s +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/session.go ---------------------------------------------------------------------- diff --cc electron/session.go index 18d8bc8,0000000..1bbc52c mode 100644,000000..100644 --- a/electron/session.go +++ b/electron/session.go @@@ -1,128 -1,0 +1,128 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "qpid.apache.org/proton" +) + +// Session is an AMQP session, it contains Senders and Receivers. +type Session interface { + Endpoint + + // Sender opens a new sender. + Sender(...LinkOption) (Sender, error) + + // Receiver opens a new Receiver. + Receiver(...LinkOption) (Receiver, error) +} + +type session struct { + endpoint + eSession proton.Session + connection *connection + capacity uint +} + +// SessionOption can be passed when creating a Session +type SessionOption func(*session) + +// IncomingCapacity returns a Session Option that sets the size (in bytes) of +// the sessions incoming data buffer.. +func IncomingCapacity(cap uint) SessionOption { return func(s *session) { s.capacity = cap } } + +// in proton goroutine +func newSession(c *connection, es proton.Session, setting ...SessionOption) *session { + s := &session{ + connection: c, + eSession: es, - endpoint: makeEndpoint(es.String()), + } ++ s.endpoint.init(es.String()) + for _, set := range setting { + set(s) + } + c.handler.sessions[s.eSession] = s + s.eSession.SetIncomingCapacity(s.capacity) + s.eSession.Open() + return s +} + +func (s *session) Connection() Connection { return s.connection } +func (s *session) eEndpoint() proton.Endpoint { return s.eSession } +func (s *session) engine() *proton.Engine { return s.connection.engine } + +func (s *session) Close(err error) { + s.engine().Inject(func() { + if s.Error() == nil { + localClose(s.eSession, err) + } + }) +} + +func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) { + err = s.engine().InjectWait(func() error { + if s.Error() != nil { + return s.Error() + } - l, err := localLink(s, true, setting...) ++ l, err := makeLocalLink(s, true, setting...) + if err == nil { + snd = newSender(l) + } + return err + }) + return +} + +func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) { + err = s.engine().InjectWait(func() error { + if s.Error() != nil { + return s.Error() + } - l, err := localLink(s, false, setting...) ++ l, err := makeLocalLink(s, false, setting...) + if err == nil { + rcv = newReceiver(l) + } + return err + }) + return +} + +// IncomingSender is sent on the Connection.Incoming() channel when there is an +// incoming request to open a session. +type IncomingSession struct { + incoming + h *handler + pSession proton.Session + capacity uint +} + +func newIncomingSession(h *handler, ps proton.Session) *IncomingSession { + return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps} +} + +// SetCapacity sets the session buffer capacity of an incoming session in bytes. +func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes } + +// Accept an incoming session endpoint. +func (in *IncomingSession) Accept() Endpoint { + return in.accept(func() Endpoint { + return newSession(in.h.connection, in.pSession, IncomingCapacity(in.capacity)) + }) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/proton/engine.go ---------------------------------------------------------------------- diff --cc proton/engine.go index 13d44b8,0000000..eecda7a mode 100644,000000..100644 --- a/proton/engine.go +++ b/proton/engine.go @@@ -1,397 -1,0 +1,409 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package proton + +// #include <proton/connection.h> +// #include <proton/event.h> +// #include <proton/error.h> +// #include <proton/handlers.h> +// #include <proton/session.h> +// #include <proton/transport.h> +// #include <memory.h> +// #include <stdlib.h> +// +// PN_HANDLE(REMOTE_ADDR) +import "C" + +import ( + "fmt" - "io" + "net" + "sync" ++ "time" + "unsafe" +) + +// Injecter allows functions to be "injected" into the event-processing loop, to +// be called in the same goroutine as event handlers. +type Injecter interface { + // Inject a function into the engine goroutine. + // + // f() will be called in the same goroutine as event handlers, so it can safely + // use values belonging to event handlers without synchronization. f() should + // not block, no further events or injected functions can be processed until + // f() returns. + // + // Returns a non-nil error if the function could not be injected and will + // never be called. Otherwise the function will eventually be called. + // + // Note that proton values (Link, Session, Connection etc.) that existed when + // Inject(f) was called may have become invalid by the time f() is executed. + // Handlers should handle keep track of Closed events to ensure proton values + // are not used after they become invalid. One technique is to have map from + // proton values to application values. Check that the map has the correct + // proton/application value pair at the start of the injected function and + // delete the value from the map when handling a Closed event. + Inject(f func()) error + + // InjectWait is like Inject but does not return till f() has completed. + // If f() cannot be injected it returns the error from Inject(), otherwise + // it returns the error from f() + InjectWait(f func() error) error +} + +// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. +type bufferChan struct { + buffers chan []byte + buf1, buf2 []byte +} + +func newBufferChan(size int) *bufferChan { + return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} +} + +func (b *bufferChan) buffer() []byte { + b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. + return b.buf1[:cap(b.buf1)] +} + +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate +// Handler functions sequentially in a single goroutine. Actions taken by +// Handler functions (such as sending messages) are encoded and written to the +// net.Conn. You can create multiple Engines to handle multiple connections +// concurrently. +// +// You implement the EventHandler and/or MessagingHandler interfaces and provide +// those values to NewEngine(). Their HandleEvent method will be called in the +// event-handling goroutine. +// +// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to +// other goroutines, store them, or use them as map indexes. Effectively they are +// just pointers. Other goroutines cannot call their methods directly but they can +// can create a function closure to call such methods and pass it to Engine.Inject() +// to have it evaluated in the engine goroutine. +// +// You are responsible for ensuring you don't use an event value after it is +// invalid. The handler methods will tell you when a value is no longer valid. For +// example after a LinkClosed event, that link is no longer valid. If you do +// Link.Close() yourself (in a handler or injected function) the link remains valid +// until the corresponing LinkClosed event is received by the handler. +// +// Engine.Close() will take care of cleaning up any remaining values when you are +// done with the Engine. All values associated with a engine become invalid when you +// call Engine.Close() +// +// The qpid.apache.org/proton/concurrent package will do all this for you, so it +// may be a better choice for some applications. +// +type Engine struct { + // Error is set on exit from Run() if there was an error. + err ErrorHolder + inject chan func() + + conn net.Conn + connection Connection + transport Transport + collector *C.pn_collector_t + read *bufferChan // Read buffers channel. + write *bufferChan // Write buffers channel. + handlers []EventHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. + closeOnce sync.Once +} + +const bufferSize = 4096 + +// NewEngine initializes a engine with a connection and handlers. To start it running: +// eng := NewEngine(...) +// go run eng.Run() +// The goroutine will exit when the engine is closed or disconnected. +// You can check for errors on Engine.Error. +// +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) { + // Save the connection ID for Connection.String() + eng := &Engine{ + inject: make(chan func()), + conn: conn, + transport: Transport{C.pn_transport()}, + connection: Connection{C.pn_connection()}, + collector: C.pn_collector(), + handlers: handlers, + read: newBufferChan(bufferSize), + write: newBufferChan(bufferSize), + running: make(chan struct{}), + } + if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil { + return nil, fmt.Errorf("failed to allocate engine") + } + + // TODO aconway 2015-06-25: connection settings for user, password, container etc. + // before transport.Bind() Set up connection before Engine, allow Engine or Reactor + // to run connection. + + // Unique container-id by default. + eng.connection.SetContainer(UUID4().String()) + pnErr := eng.transport.Bind(eng.connection) + if pnErr != 0 { + return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr)) + } + C.pn_connection_collect(eng.connection.pn, eng.collector) + eng.connection.Open() + return eng, nil +} + +func (eng *Engine) String() string { + return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr()) +} + +func (eng *Engine) Id() string { - return fmt.Sprintf("%eng", &eng) ++ return fmt.Sprintf("%p", eng) +} + +func (eng *Engine) Error() error { + return eng.err.Get() +} + +// Inject a function into the Engine's event loop. +// +// f() will be called in the same event-processing goroutine that calls Handler +// methods. f() can safely call methods on values that belong to this engine +// (Sessions, Links etc) +// +// The injected function has no parameters or return values. It is normally a +// closure and can use channels to communicate with the injecting goroutine if +// necessary. +// +// Returns a non-nil error if the engine is closed before the function could be +// injected. +func (eng *Engine) Inject(f func()) error { + select { + case eng.inject <- f: + return nil + case <-eng.running: + return eng.Error() + } +} + +// InjectWait is like Inject but does not return till f() has completed or the +// engine is closed, and returns an error value from f() +func (eng *Engine) InjectWait(f func() error) error { + done := make(chan error) + defer close(done) + err := eng.Inject(func() { done <- f() }) + if err != nil { + return err + } + select { + case <-eng.running: + return eng.Error() + case err := <-done: + return err + } +} + +// Server puts the Engine in server mode, meaning it will auto-detect security settings on +// the incoming connnection such as use of SASL and SSL. +// Must be called before Run() +// +func (eng *Engine) Server() { eng.transport.SetServer() } + - // Close the engine's connection, returns when the engine has exited. ++func (eng *Engine) disconnect() { ++ eng.transport.CloseHead() ++ eng.transport.CloseTail() ++ eng.conn.Close() ++ eng.dispatch() ++} ++ ++// Close the engine's connection. ++// If err != nil pass it to the remote end as the close condition. ++// Returns when the remote end closes or disconnects. +func (eng *Engine) Close(err error) { - eng.err.Set(err) - eng.Inject(func() { - CloseError(eng.connection, err) - }) ++ eng.Inject(func() { CloseError(eng.connection, err) }) + <-eng.running +} + - // Disconnect the engine's connection without and AMQP close, returns when the engine has exited. ++// CloseTimeout like Close but disconnect if the remote end doesn't close within timeout. ++func (eng *Engine) CloseTimeout(err error, timeout time.Duration) { ++ eng.Inject(func() { CloseError(eng.connection, err) }) ++ select { ++ case <-eng.running: ++ case <-time.After(timeout): ++ eng.Disconnect(err) ++ } ++} ++ ++// Disconnect the engine's connection immediately without an AMQP close. ++// Process any termination events before returning. +func (eng *Engine) Disconnect(err error) { - eng.err.Set(err) - eng.conn.Close() ++ eng.Inject(func() { eng.transport.Condition().SetError(err); eng.disconnect() }) + <-eng.running +} + +// Run the engine. Engine.Run() will exit when the engine is closed or +// disconnected. You can check for errors after exit with Engine.Error(). +// +func (eng *Engine) Run() error { + wait := sync.WaitGroup{} + wait.Add(2) // Read and write goroutines + + readErr := make(chan error, 1) // Don't block + go func() { // Read goroutine + defer wait.Done() + for { + rbuf := eng.read.buffer() + n, err := eng.conn.Read(rbuf) + if n > 0 { + eng.read.buffers <- rbuf[:n] + } + if err != nil { + readErr <- err + close(readErr) + close(eng.read.buffers) + return + } + } + }() + + writeErr := make(chan error, 1) // Don't block + go func() { // Write goroutine + defer wait.Done() + for { + wbuf, ok := <-eng.write.buffers + if !ok { + return + } + _, err := eng.conn.Write(wbuf) + if err != nil { + writeErr <- err + close(writeErr) + return + } + } + }() + + wbuf := eng.write.buffer()[:0] + - for eng.err.Get() == nil { ++ for !eng.transport.Closed() { + if len(wbuf) == 0 { + eng.pop(&wbuf) + } + // Don't set wchan unless there is something to write. + var wchan chan []byte + if len(wbuf) > 0 { + wchan = eng.write.buffers + } + + select { + case buf, ok := <-eng.read.buffers: // Read a buffer + if ok { + eng.push(buf) + } + case wchan <- wbuf: // Write a buffer + wbuf = eng.write.buffer()[:0] + case f, ok := <-eng.inject: // Function injected from another goroutine + if ok { + f() + } + case err := <-readErr: - eng.netError(err) ++ eng.transport.Condition().SetError(err) ++ eng.transport.CloseTail() + case err := <-writeErr: - eng.netError(err) ++ eng.transport.Condition().SetError(err) ++ eng.transport.CloseHead() ++ } ++ eng.dispatch() ++ if eng.connection.State().RemoteClosed() && eng.connection.State().LocalClosed() { ++ eng.disconnect() + } - eng.process() + } ++ eng.err.Set(EndpointError(eng.connection)) ++ eng.err.Set(eng.transport.Condition().Error()) + close(eng.write.buffers) + eng.conn.Close() // Make sure connection is closed + wait.Wait() + close(eng.running) // Signal goroutines have exited and Error is set. + + if !eng.connection.IsNil() { + eng.connection.Free() + } + if !eng.transport.IsNil() { + eng.transport.Free() + } + if eng.collector != nil { + C.pn_collector_free(eng.collector) + } + for _, h := range eng.handlers { + switch h := h.(type) { + case cHandler: + C.pn_handler_free(h.pn) + } + } + return eng.err.Get() +} + - func (eng *Engine) netError(err error) { - eng.err.Set(err) - eng.transport.CloseHead() - eng.transport.CloseTail() - } - +func minInt(a, b int) int { + if a < b { + return a + } else { + return b + } +} + +func (eng *Engine) pop(buf *[]byte) { + pending := int(eng.transport.Pending()) + switch { + case pending == int(C.PN_EOS): + *buf = (*buf)[:] + return + case pending < 0: + panic(fmt.Errorf("%s", PnErrorCode(pending))) + } + size := minInt(pending, cap(*buf)) + *buf = (*buf)[:size] + if size == 0 { + return + } + C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size)) + assert(size > 0) + eng.transport.Pop(uint(size)) +} + +func (eng *Engine) push(buf []byte) { + buf2 := buf + for len(buf2) > 0 { + n := eng.transport.Push(buf2) + if n <= 0 { + panic(fmt.Errorf("error in transport: %s", PnErrorCode(n))) + } + buf2 = buf2[n:] + } +} + - func (eng *Engine) handle(e Event) { - for _, h := range eng.handlers { - h.HandleEvent(e) - } - if e.Type() == ETransportClosed { - eng.err.Set(io.EOF) - } - } ++func (eng *Engine) peek() *C.pn_event_t { return C.pn_collector_peek(eng.collector) } + - func (eng *Engine) process() { - for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) { - eng.handle(makeEvent(ce, eng)) ++func (eng *Engine) dispatch() { ++ for ce := eng.peek(); ce != nil; ce = eng.peek() { ++ for _, h := range eng.handlers { ++ h.HandleEvent(makeEvent(ce, eng)) ++ } + C.pn_collector_pop(eng.collector) + } +} + +func (eng *Engine) Connection() Connection { return eng.connection } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
