Repository: qpid-proton Updated Branches: refs/heads/master d221b2744 -> 380f81d05
NO-JIRA: Go: Fix cleanup of sessions by connection More consistent handling for endpoint close. Add endpoint valid check in all injected functions to catch injected functions executed after an endpoint close. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/380f81d0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/380f81d0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/380f81d0 Branch: refs/heads/master Commit: 380f81d054d983bbf30fd116b9ee86d80ef66788 Parents: d221b27 Author: Alan Conway <[email protected]> Authored: Fri Nov 20 10:01:55 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Mon Nov 23 10:05:57 2015 -0500 ---------------------------------------------------------------------- .../go/src/qpid.apache.org/electron/doc.go | 11 ++++-- .../go/src/qpid.apache.org/electron/endpoint.go | 24 ++++++------ .../go/src/qpid.apache.org/electron/handler.go | 40 ++++++++++++++------ .../go/src/qpid.apache.org/proton/wrappers.go | 4 +- 4 files changed, 52 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go index a484900..46bde37 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go @@ -49,10 +49,15 @@ import "C" There is a single proton.Engine per connection, each driving it's own event-loop goroutine, and each with a 'handler'. Most state for a connection is maintained on the handler, and -only accessed in the event-loop goroutine, so no locks are required. +only accessed in the event-loop goroutine, so no locks are required there. The handler sets up channels as needed to get or send data from user goroutines -using electron types like Sender or Receiver. We also use Engine.Inject to inject -actions into the event loop from user goroutines. +using electron types like Sender or Receiver. + +We also use Engine.Inject to inject actions into the event loop from user +goroutines. It is important to check at the start of an injected function that +required objects are still valid, for example a link may be remotely closed +between the time a Sender function calls Inject and the time the injected +function is execute by the handler goroutine. See comments in endpoint.go for more. */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go index f04b240..8cbeadb 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go @@ -54,6 +54,10 @@ type Endpoint interface { Done() <-chan struct{} } +// DEVELOPER NOTES +// +// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated +// type endpoint struct { err proton.ErrorHolder str string // Must be set by the value that embeds endpoint. @@ -62,10 +66,17 @@ type endpoint struct { func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} } -func (e *endpoint) closed(err error) { +// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding +// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure +// the pointer has not been invalidated. +// +// Returns the error stored on the endpoint, which may not be different to err if there was +// already a n error +func (e *endpoint) closed(err error) error { e.err.Set(err) e.err.Set(Closed) close(e.done) + return e.err.Get() } func (e *endpoint) String() string { return e.str } @@ -74,19 +85,10 @@ func (e *endpoint) Error() error { return e.err.Get() } func (e *endpoint) Done() <-chan struct{} { return e.done } -// Call in proton goroutine to close an endpoint locally +// Call in proton goroutine to initiate closing an endpoint locally // handler will complete the close when remote end closes. func localClose(ep proton.Endpoint, err error) { if ep.State().LocalActive() { proton.CloseError(ep, err) } } - -// Used to indicate that a channel has closed which normally is because the endpoint is closed. -func errorOrClosed(e Endpoint) error { - if e.Error() != nil { - return e.Error() - } else { - return Closed - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go index 1586026..0237156 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go @@ -49,6 +49,7 @@ func newHandler(c *connection) *handler { h.delegator.AutoOpen = false return h } + func (h *handler) linkError(l proton.Link, msg string) { proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l)) } @@ -83,13 +84,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) } case proton.MSessionClosed: - err := proton.EndpointError(e.Session()) - for l, _ := range h.links { - if l.Session() == e.Session() { - h.linkClosed(l, err) - } - } - delete(h.sessions, e.Session()) + h.sessionClosed(e.Session(), proton.EndpointError(e.Session())) case proton.MLinkOpening: l := e.Link() @@ -117,7 +112,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) h.connection.err.Set(e.Connection().RemoteCondition().Error()) case proton.MConnectionClosed: - h.connection.err.Set(Closed) // If no error already set, this is an orderly close. + h.connectionClosed(proton.EndpointError(e.Connection())) case proton.MDisconnected: h.connection.err.Set(e.Transport().Condition().Error()) @@ -157,13 +152,36 @@ func (h *handler) incoming(in Incoming) { } } +func (h *handler) addLink(pl proton.Link, el Link) { + h.links[pl] = el +} + func (h *handler) linkClosed(l proton.Link, err error) { - if link := h.links[l]; link != nil { + if link, ok := h.links[l]; ok { link.closed(err) delete(h.links, l) } } -func (h *handler) addLink(rl proton.Link, ll Link) { - h.links[rl] = ll +func (h *handler) sessionClosed(ps proton.Session, err error) { + if s, ok := h.sessions[ps]; ok { + delete(h.sessions, ps) + err = s.closed(err) + for l, _ := range h.links { + if l.Session() == ps { + h.linkClosed(l, err) + } + } + } +} + +func (h *handler) connectionClosed(err error) { + err = h.connection.closed(err) + // Close links first to avoid repeated scans of the link list by sessions. + for l, _ := range h.links { + h.linkClosed(l, err) + } + for s, _ := range h.sessions { + h.sessionClosed(s, err) + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go index 0b881c1..a48aeab 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go @@ -172,8 +172,8 @@ type Endpoint interface { Type() string } -// CloseError sets an error condition on an endpoint and closes the endpoint -// if not already closed +// CloseError sets an error condition (if err != nil) on an endpoint and closes +// the endpoint if not already closed func CloseError(e Endpoint, err error) { if err != nil { e.Condition().SetError(err) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
