Repository: qpid-proton
Updated Branches:
  refs/heads/master d221b2744 -> 380f81d05


NO-JIRA: Go: Fix cleanup of sessions by connection

More consistent handling for endpoint close.

Add endpoint valid check in all injected functions to catch injected functions
executed after an endpoint close.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/380f81d0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/380f81d0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/380f81d0

Branch: refs/heads/master
Commit: 380f81d054d983bbf30fd116b9ee86d80ef66788
Parents: d221b27
Author: Alan Conway <[email protected]>
Authored: Fri Nov 20 10:01:55 2015 -0500
Committer: Alan Conway <[email protected]>
Committed: Mon Nov 23 10:05:57 2015 -0500

----------------------------------------------------------------------
 .../go/src/qpid.apache.org/electron/doc.go      | 11 ++++--
 .../go/src/qpid.apache.org/electron/endpoint.go | 24 ++++++------
 .../go/src/qpid.apache.org/electron/handler.go  | 40 ++++++++++++++------
 .../go/src/qpid.apache.org/proton/wrappers.go   |  4 +-
 4 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index a484900..46bde37 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -49,10 +49,15 @@ import "C"
 
 There is a single proton.Engine per connection, each driving it's own 
event-loop goroutine,
 and each with a 'handler'. Most state for a connection is maintained on the 
handler, and
-only accessed in the event-loop goroutine, so no locks are required.
+only accessed in the event-loop goroutine, so no locks are required there.
 
 The handler sets up channels as needed to get or send data from user goroutines
-using electron types like Sender or Receiver. We also use Engine.Inject to 
inject
-actions into the event loop from user goroutines.
+using electron types like Sender or Receiver.
+
+We also use Engine.Inject to inject actions into the event loop from user
+goroutines. It is important to check at the start of an injected function that
+required objects are still valid, for example a link may be remotely closed
+between the time a Sender function calls Inject and the time the injected
+function is execute by the handler goroutine. See comments in endpoint.go for 
more.
 
 */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index f04b240..8cbeadb 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -54,6 +54,10 @@ type Endpoint interface {
        Done() <-chan struct{}
 }
 
+// DEVELOPER NOTES
+//
+// An electron.Endpoint corresponds to a proton.Endpoint, which can be 
invalidated
+//
 type endpoint struct {
        err  proton.ErrorHolder
        str  string // Must be set by the value that embeds endpoint.
@@ -62,10 +66,17 @@ type endpoint struct {
 
 func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan 
struct{})} }
 
-func (e *endpoint) closed(err error) {
+// Called in handler on a Closed event. Marks the endpoint as closed and the 
corresponding
+// proton.Endpoint pointer as invalid. Injected functions should check Error() 
to ensure
+// the pointer has not been invalidated.
+//
+// Returns the error stored on the endpoint, which may not be different to err 
if there was
+// already a n error
+func (e *endpoint) closed(err error) error {
        e.err.Set(err)
        e.err.Set(Closed)
        close(e.done)
+       return e.err.Get()
 }
 
 func (e *endpoint) String() string { return e.str }
@@ -74,19 +85,10 @@ func (e *endpoint) Error() error { return e.err.Get() }
 
 func (e *endpoint) Done() <-chan struct{} { return e.done }
 
-// Call in proton goroutine to close an endpoint locally
+// Call in proton goroutine to initiate closing an endpoint locally
 // handler will complete the close when remote end closes.
 func localClose(ep proton.Endpoint, err error) {
        if ep.State().LocalActive() {
                proton.CloseError(ep, err)
        }
 }
-
-// Used to indicate that a channel has closed which normally is because the 
endpoint is closed.
-func errorOrClosed(e Endpoint) error {
-       if e.Error() != nil {
-               return e.Error()
-       } else {
-               return Closed
-       }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1586026..0237156 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -49,6 +49,7 @@ func newHandler(c *connection) *handler {
        h.delegator.AutoOpen = false
        return h
 }
+
 func (h *handler) linkError(l proton.Link, msg string) {
        proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", 
msg, l.Type(), l))
 }
@@ -83,13 +84,7 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                }
 
        case proton.MSessionClosed:
-               err := proton.EndpointError(e.Session())
-               for l, _ := range h.links {
-                       if l.Session() == e.Session() {
-                               h.linkClosed(l, err)
-                       }
-               }
-               delete(h.sessions, e.Session())
+               h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 
        case proton.MLinkOpening:
                l := e.Link()
@@ -117,7 +112,7 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                h.connection.err.Set(e.Connection().RemoteCondition().Error())
 
        case proton.MConnectionClosed:
-               h.connection.err.Set(Closed) // If no error already set, this 
is an orderly close.
+               h.connectionClosed(proton.EndpointError(e.Connection()))
 
        case proton.MDisconnected:
                h.connection.err.Set(e.Transport().Condition().Error())
@@ -157,13 +152,36 @@ func (h *handler) incoming(in Incoming) {
        }
 }
 
+func (h *handler) addLink(pl proton.Link, el Link) {
+       h.links[pl] = el
+}
+
 func (h *handler) linkClosed(l proton.Link, err error) {
-       if link := h.links[l]; link != nil {
+       if link, ok := h.links[l]; ok {
                link.closed(err)
                delete(h.links, l)
        }
 }
 
-func (h *handler) addLink(rl proton.Link, ll Link) {
-       h.links[rl] = ll
+func (h *handler) sessionClosed(ps proton.Session, err error) {
+       if s, ok := h.sessions[ps]; ok {
+               delete(h.sessions, ps)
+               err = s.closed(err)
+               for l, _ := range h.links {
+                       if l.Session() == ps {
+                               h.linkClosed(l, err)
+                       }
+               }
+       }
+}
+
+func (h *handler) connectionClosed(err error) {
+       err = h.connection.closed(err)
+       // Close links first to avoid repeated scans of the link list by 
sessions.
+       for l, _ := range h.links {
+               h.linkClosed(l, err)
+       }
+       for s, _ := range h.sessions {
+               h.sessionClosed(s, err)
+       }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 0b881c1..a48aeab 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -172,8 +172,8 @@ type Endpoint interface {
        Type() string
 }
 
-// CloseError sets an error condition on an endpoint and closes the endpoint
-// if not already closed
+// CloseError sets an error condition (if err != nil) on an endpoint and closes
+// the endpoint if not already closed
 func CloseError(e Endpoint, err error) {
        if err != nil {
                e.Condition().SetError(err)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to