PROTON-827: go binding: Added endpoint Disconnected events.

On connection disconnect, the MessagingHandler receives LinkDisconnected,
SessionDisconnected and ConnectionDisconnected events for all disconnected
objects associated with the connection. Simplifies cleanup on disconnect.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/1e4118f6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/1e4118f6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/1e4118f6

Branch: refs/heads/rajith-codec
Commit: 1e4118f63b9e64c3390f47d5ee91b17dfc24ab5a
Parents: 49335b7
Author: Alan Conway <[email protected]>
Authored: Wed May 6 16:03:05 2015 -0400
Committer: Alan Conway <[email protected]>
Committed: Wed May 6 17:04:27 2015 -0400

----------------------------------------------------------------------
 examples/go/event/broker.go                     | 10 +-
 proton-c/bindings/go/src/genwrap.go             |  6 +-
 .../go/src/qpid.apache.org/proton/event/doc.go  |  2 +-
 .../qpid.apache.org/proton/event/handlers.go    | 98 ++++++++++++--------
 .../go/src/qpid.apache.org/proton/event/pump.go | 14 +--
 .../qpid.apache.org/proton/event/wrappers.go    | 29 ++++++
 .../proton/event/wrappers_gen.go                | 23 +----
 proton-c/bindings/python/proton/handlers.py     |  2 +-
 8 files changed, 103 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/examples/go/event/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go
index 1c963e3..9720843 100644
--- a/examples/go/event/broker.go
+++ b/examples/go/event/broker.go
@@ -172,17 +172,9 @@ func (b *broker) HandleMessagingEvent(t 
event.MessagingEventType, e event.Event)
                        q.subscribe(e.Link())
                }
 
-       case event.MLinkClosing:
+       case event.MLinkDisconnected, event.MLinkClosing:
                b.unsubscribe(e.Link())
 
-       case event.MDisconnected:
-               fallthrough
-       case event.MConnectionClosing:
-               c := e.Connection()
-               for l := c.LinkHead(event.SRemoteActive); !l.IsNil(); l = 
l.Next(event.SRemoteActive) {
-                       b.unsubscribe(l)
-               }
-
        case event.MSendable:
                q := b.getQueue(e.Link().RemoteSource().Address())
                q.popTo(e.Connection().Pump(), e.Link())

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/genwrap.go 
b/proton-c/bindings/go/src/genwrap.go
index e269367..e34c045 100644
--- a/proton-c/bindings/go/src/genwrap.go
+++ b/proton-c/bindings/go/src/genwrap.go
@@ -152,9 +152,7 @@ var (
 func event(out io.Writer) {
        event_h := readHeader("event")
 
-       // event.h API functions
-       apiWrapFns("event", event_h, out)
-       fmt.Fprintln(out, `func (e Event) String() string { return 
e.Type().String() }`)
+       // Event is implented by hand in wrappers.go
 
        // Get all the pn_event_type_t enum values
        var etypes []eventType
@@ -206,6 +204,8 @@ func (g genType) goConvert(value string) string {
        switch g.Gotype {
        case "string":
                return fmt.Sprintf("C.GoString(%s)", value)
+       case "Event":
+               return fmt.Sprintf("makeEvent(%s)", value)
        default:
                return fmt.Sprintf("%s(%s)", g.Gotype, value)
        }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
index 6a1c8ac..a0d45d7 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go
@@ -23,7 +23,7 @@ Package event provides a low-level API to the proton AMQP 
engine.
 For most tasks, consider instead package qpid.apache.org/proton/messaging.
 It provides a higher-level, concurrent API that is easier to use.
 
-The API is event based. There are two alternative styles of handler. 
CoreHandler
+The API is event based. There are two alternative styles of handler. 
EventHandler
 provides the core proton events. MessagingHandler provides a slighly simplified
 view of the event stream and automates some common tasks.
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
index 450a114..5fc679a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go
@@ -27,8 +27,8 @@ import (
        "qpid.apache.org/proton/internal"
 )
 
-// CoreHandler handles core proton events.
-type CoreHandler interface {
+// EventHandler handles core proton events.
+type EventHandler interface {
        // HandleEvent is called with an event.
        // Typically HandleEvent() is implemented as a switch on e.Type()
        HandleEvent(e Event) error
@@ -44,11 +44,11 @@ func (h cHandler) HandleEvent(e Event) error {
        return nil // FIXME aconway 2015-03-31: error handling
 }
 
-// MessagingHandler provides an alternative interface to CoreHandler.
+// MessagingHandler provides an alternative interface to EventHandler.
 // it is easier to use for most applications that send and receive messages.
 //
 // Implement this interface and then wrap your value with a 
MessagingHandlerDelegator.
-// MessagingHandlerDelegator implements CoreHandler and can be registered with 
a Pump.
+// MessagingHandlerDelegator implements EventHandler and can be registered 
with a Pump.
 //
 type MessagingHandler interface {
        HandleMessagingEvent(MessagingEventType, Event) error
@@ -57,82 +57,80 @@ type MessagingHandler interface {
 // MessagingEventType provides a set of events that are easier to work with 
than the
 // core events defined by EventType
 //
+// There are 3 types of "endpoint": Connection, Session and Link.
+// For each endpoint there are 5 event types: Opening, Opened, Closing, Closed 
and Error.
+// The meaning of these events is as follows:
+//
+// Opening: The remote end opened, the local end will open automatically.
+//
+// Opened: Both ends are open, regardless of which end opened first.
+//
+// Closing: The remote end closed without error, the local end will close 
automatically.
+//
+// Error: The remote end closed with an error, the local end will close 
automatically.
+//
+// Closed: Both ends are closed, regardless of which end closed first or if 
there was an error.
+//
 type MessagingEventType int
 
 const (
        // The event loop starts.
        MStart MessagingEventType = iota
-
        // The peer closes the connection with an error condition.
        MConnectionError
-
        // The peer closes the session with an error condition.
        MSessionError
-
        // The peer closes the link with an error condition.
        MLinkError
-
        // The peer Initiates the opening of the connection.
        MConnectionOpening
-
        // The peer initiates the opening of the session.
        MSessionOpening
-
        // The peer initiates the opening of the link.
        MLinkOpening
-
        // The connection is opened.
        MConnectionOpened
-
        // The session is opened.
        MSessionOpened
-
        // The link is opened.
        MLinkOpened
-
        // The peer initiates the closing of the connection.
        MConnectionClosing
-
        // The peer initiates the closing of the session.
        MSessionClosing
-
        // The peer initiates the closing of the link.
        MLinkClosing
-
        // Both ends of the connection are closed.
        MConnectionClosed
-
        // Both ends of the session are closed.
        MSessionClosed
-
        // Both ends of the link are closed.
        MLinkClosed
-
-       // The socket is disconnected.
-       MDisconnected
-
+       // The connection is disconnected.
+       MConnectionDisconnected
+       // The session's connection was disconnected
+       MSessionDisconnected
+       // The session's connection was disconnected
+       MLinkDisconnected
        // The sender link has credit and messages can
        // therefore be transferred.
        MSendable
-
        // The remote peer accepts an outgoing message.
        MAccepted
-
        // The remote peer rejects an outgoing message.
        MRejected
-
        // The peer releases an outgoing message. Note that this may be in 
response to
        // either the RELEASE or MODIFIED state as defined by the AMQP 
specification.
        MReleased
-
        // The peer has settled the outgoing message. This is the point at 
which it
        // shouod never be retransmitted.
        MSettled
-
        // A message is received. Call proton.EventMessage(Event) to get the 
message.
        // To manage the outcome of this messages (e.g. to accept or reject the 
message)
        // use Event.Delivery().
        MMessage
+       // The event loop terminates, there are no more events to process.
+       MFinal
 )
 
 func (t MessagingEventType) String() string {
@@ -169,8 +167,12 @@ func (t MessagingEventType) String() string {
                return "SessionClosed"
        case MLinkClosed:
                return "LinkClosed"
-       case MDisconnected:
-               return "Disconnected"
+       case MConnectionDisconnected:
+               return "ConnectionDisconnected"
+       case MSessionDisconnected:
+               return "MSessionDisconnected"
+       case MLinkDisconnected:
+               return "MLinkDisconnected"
        case MSendable:
                return "Sendable"
        case MAccepted:
@@ -223,19 +225,20 @@ func (d endpointDelegator) HandleEvent(e Event) (err 
error) {
 
        case d.remoteClose:
                var err1 error
-               if endpoint.RemoteCondition().IsSet() {
+               if endpoint.RemoteCondition().IsSet() { // Closed with error
                        err1 = d.delegate.HandleMessagingEvent(d.error, e)
-                       if err1 == nil {
+                       if err1 == nil { // Don't overwrite an application 
error.
                                err1 = endpoint.RemoteCondition().Error()
                        }
+               } else {
+                       err1 = d.delegate.HandleMessagingEvent(d.closing, e)
                }
                if state.Is(SLocalClosed) {
                        err = d.delegate.HandleMessagingEvent(d.closed, e)
-               } else {
-                       err = d.delegate.HandleMessagingEvent(d.closing, e)
+               } else if state.Is(SLocalActive) {
                        endpoint.Close()
                }
-               if err1 != nil {
+               if err1 != nil { // Keep the first error.
                        err = err1
                }
 
@@ -252,13 +255,13 @@ func (d endpointDelegator) HandleEvent(e Event) (err 
error) {
        return err
 }
 
-// MessagingDelegator implments a CoreHandler and delegates to a 
MessagingHandler.
+// MessagingDelegator implments a EventHandler and delegates to a 
MessagingHandler.
 // You can modify the exported fields before you pass the MessagingDelegator to
 // a Pump.
 type MessagingDelegator struct {
        delegate                   MessagingHandler
        connection, session, link  endpointDelegator
-       handshaker, flowcontroller CoreHandler
+       handshaker, flowcontroller EventHandler
 
        // AutoSettle (default true) automatically pre-settle outgoing messages.
        AutoSettle bool
@@ -271,7 +274,7 @@ type MessagingDelegator struct {
        PeerCloseError bool
 }
 
-func NewMessagingDelegator(h MessagingHandler) CoreHandler {
+func NewMessagingDelegator(h MessagingHandler) EventHandler {
        return &MessagingDelegator{
                delegate: h,
                connection: endpointDelegator{
@@ -303,13 +306,15 @@ func NewMessagingDelegator(h MessagingHandler) 
CoreHandler {
        }
 }
 
-func handleIf(h CoreHandler, e Event) error {
+func handleIf(h EventHandler, e Event) error {
        if h != nil {
                return h.HandleEvent(e)
        }
        return nil
 }
 
+// Handle a proton event by passing the corresponding MessagingEvent(s) to
+// the MessagingHandler.
 func (d *MessagingDelegator) HandleEvent(e Event) error {
        handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error 
handling.
 
@@ -341,7 +346,20 @@ func (d *MessagingDelegator) HandleEvent(e Event) error {
                }
 
        case ETransportTailClosed:
-               d.delegate.HandleMessagingEvent(MDisconnected, e)
+               c := e.Connection()
+               for l := c.LinkHead(SRemoteActive); !l.IsNil(); l = 
l.Next(SRemoteActive) {
+                       e2 := e
+                       e2.link = l
+                       e2.session = l.Session()
+                       d.delegate.HandleMessagingEvent(MLinkDisconnected, e2)
+               }
+               for s := c.SessionHead(SRemoteActive); !s.IsNil(); s = 
s.Next(SRemoteActive) {
+                       e2 := e
+                       e2.session = s
+                       d.delegate.HandleMessagingEvent(MSessionDisconnected, 
e2)
+               }
+               d.delegate.HandleMessagingEvent(MConnectionDisconnected, e)
+               d.delegate.HandleMessagingEvent(MFinal, e)
        }
        return nil
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
index 564ca6c..c9c5ca3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go
@@ -72,7 +72,7 @@ data to/from a net.Conn. You can create multiple Pumps to 
handle multiple
 connections concurrently.
 
 Methods in this package can only be called in the goroutine that executes the
-corresponding Pump.Run(). You implement the CoreHandler or MessagingHandler
+corresponding Pump.Run(). You implement the EventHandler or MessagingHandler
 interfaces and provide those values to NewPump(). Their HandleEvent method 
will be
 called in the Pump goroutine, in typical event-driven style.
 
@@ -109,10 +109,10 @@ type Pump struct {
        transport  *C.pn_transport_t
        connection *C.pn_connection_t
        collector  *C.pn_collector_t
-       read       *bufferChan   // Read buffers channel.
-       write      *bufferChan   // Write buffers channel.
-       handlers   []CoreHandler // Handlers for proton events.
-       running    chan struct{} // This channel will be closed when the 
goroutines are done.
+       read       *bufferChan    // Read buffers channel.
+       write      *bufferChan    // Write buffers channel.
+       handlers   []EventHandler // Handlers for proton events.
+       running    chan struct{}  // This channel will be closed when the 
goroutines are done.
 }
 
 const bufferSize = 4096
@@ -129,7 +129,7 @@ func init() {
 // The goroutine will exit when the pump is closed or disconnected.
 // You can check for errors on Pump.Error.
 //
-func NewPump(conn net.Conn, handlers ...CoreHandler) (*Pump, error) {
+func NewPump(conn net.Conn, handlers ...EventHandler) (*Pump, error) {
        // Save the connection ID for Connection.String()
        p := &Pump{
                Inject:     make(chan func(), 100), // FIXME aconway 
2015-05-04: blocking hack
@@ -344,7 +344,7 @@ func (p *Pump) handle(e Event) error {
 func (p *Pump) process() error {
        // FIXME aconway 2015-05-04: if a Handler returns error we should stop 
the pump
        for ce := C.pn_collector_peek(p.collector); ce != nil; ce = 
C.pn_collector_peek(p.collector) {
-               e := Event{ce}
+               e := makeEvent(ce)
                if err := p.handle(e); err != nil {
                        return err
                }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
index 3584311..d2c4e43 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go
@@ -24,6 +24,7 @@ package event
 //#include <proton/session.h>
 //#include <proton/session.h>
 //#include <proton/delivery.h>
+//#include <proton/link.h>
 //#include <proton/event.h>
 //#include <proton/transport.h>
 //#include <proton/link.h>
@@ -38,6 +39,34 @@ import (
 
 // FIXME aconway 2015-05-05: Documentation for generated types.
 
+// Event is an AMQP protocol event.
+type Event struct {
+       pn         *C.pn_event_t
+       eventType  EventType
+       connection Connection
+       session    Session
+       link       Link
+       delivery   Delivery
+}
+
+func makeEvent(pn *C.pn_event_t) Event {
+       return Event{
+               pn:         pn,
+               eventType:  EventType(C.pn_event_type(pn)),
+               connection: Connection{C.pn_event_connection(pn)},
+               session:    Session{C.pn_event_session(pn)},
+               link:       Link{C.pn_event_link(pn)},
+               delivery:   Delivery{C.pn_event_delivery(pn)},
+       }
+}
+func (e Event) IsNil() bool            { return e.eventType == EventType(0) }
+func (e Event) Type() EventType        { return e.eventType }
+func (e Event) Connection() Connection { return e.connection }
+func (e Event) Session() Session       { return e.session }
+func (e Event) Link() Link             { return e.link }
+func (e Event) Delivery() Delivery     { return e.delivery }
+func (e Event) String() string         { return e.Type().String() }
+
 // Data holds a pointer to decoded AMQP data.
 // Use proton.marshal/unmarshal to access it as Go data types.
 //

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
index 8f678ca..f53e8bb 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers_gen.go
@@ -41,26 +41,6 @@ import (
 // #include <proton/connection.h>
 import "C"
 
-type Event struct{ pn *C.pn_event_t }
-
-func (e Event) IsNil() bool { return e.pn == nil }
-func (e Event) Type() EventType {
-       return EventType(C.pn_event_type(e.pn))
-}
-func (e Event) Connection() Connection {
-       return Connection{C.pn_event_connection(e.pn)}
-}
-func (e Event) Session() Session {
-       return Session{C.pn_event_session(e.pn)}
-}
-func (e Event) Link() Link {
-       return Link{C.pn_event_link(e.pn)}
-}
-func (e Event) Delivery() Delivery {
-       return Delivery{C.pn_event_delivery(e.pn)}
-}
-func (e Event) String() string { return e.Type().String() }
-
 type EventType int
 
 const (
@@ -411,6 +391,9 @@ func (d Delivery) Update(state uint64) {
 func (d Delivery) Clear() {
        C.pn_delivery_clear(d.pn)
 }
+func (d Delivery) Current() bool {
+       return bool(C.pn_delivery_current(d.pn))
+}
 func (d Delivery) Settle() {
        C.pn_delivery_settle(d.pn)
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1e4118f6/proton-c/bindings/python/proton/handlers.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton/handlers.py 
b/proton-c/bindings/python/proton/handlers.py
index 6836788..ac19e6f 100644
--- a/proton-c/bindings/python/proton/handlers.py
+++ b/proton-c/bindings/python/proton/handlers.py
@@ -254,7 +254,7 @@ class EndpointStateHandler(Handler):
         if event.connection.remote_condition:
             self.on_connection_error(event)
         elif self.is_local_closed(event.connection):
-            self.on_connection_closed(event)
+           self.on_connection_closed(event)
         else:
             self.on_connection_closing(event)
         event.connection.close()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to