PROTON-1045: Use of callbacks to handle accepted endpoints violates design 
goals.

Get rid of use of callback functions that run in proton goroutine.
Use a channel instead for Incoming requests, all user code runs in user 
goroutines.
Safer and more consistent.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/91fe6e0f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/91fe6e0f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/91fe6e0f

Branch: refs/heads/go1
Commit: 91fe6e0fda07efa07f10a97d906eee114c86d6bf
Parents: f3edf57
Author: Alan Conway <[email protected]>
Authored: Wed Nov 11 10:22:01 2015 -0500
Committer: Alan Conway <[email protected]>
Committed: Wed Nov 11 21:54:25 2015 -0500

----------------------------------------------------------------------
 examples/go/electron/broker.go                  |  35 ++++--
 .../src/qpid.apache.org/electron/connection.go  |  85 ++++++++------
 .../go/src/qpid.apache.org/electron/handler.go  |  43 ++++---
 .../go/src/qpid.apache.org/electron/link.go     |  36 +++---
 .../qpid.apache.org/electron/messaging_test.go  | 116 +++++++++++--------
 .../go/src/qpid.apache.org/electron/receiver.go |  23 ++--
 .../go/src/qpid.apache.org/electron/sender.go   |  15 +--
 .../go/src/qpid.apache.org/electron/session.go  |  26 +++--
 .../go/src/qpid.apache.org/proton/wrappers.go   |  17 +++
 9 files changed, 233 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index f1dce17..f68deb1 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -78,26 +78,43 @@ func (b *broker) run() error {
                        util.Debugf("Accept error: %v", err)
                        continue
                }
-               c, err := b.container.Connection(conn, electron.Server(), 
electron.Accepter(b.accept))
+               c, err := b.container.Connection(conn, electron.Server(), 
electron.AllowIncoming())
                if err != nil {
                        util.Debugf("Connection error: %v", err)
                        continue
                }
+               go b.accept(c) // Goroutine to accept incoming sessions and 
links.
                util.Debugf("Accepted %v", c)
        }
 }
 
 // accept remotely-opened endpoints (Session, Sender and Receiver)
 // and start goroutines to service them.
-func (b *broker) accept(i electron.Incoming) {
-       switch i := i.(type) {
-       case *electron.IncomingSender:
-               go b.sender(i.AcceptSender())
-       case *electron.IncomingReceiver:
-               go b.receiver(i.AcceptReceiver(100, true)) // Pre-fetch 100 
messages
-       default:
-               i.Accept()
+func (b *broker) accept(c electron.Connection) {
+       for in := range c.Incoming() {
+               switch in := in.(type) {
+
+               case *electron.IncomingSender:
+                       if in.Source() == "" {
+                               util.Debugf("sender has no source: %s", in)
+                               break
+                       }
+                       go b.sender(in.Accept().(electron.Sender))
+
+               case *electron.IncomingReceiver:
+                       if in.Target() == "" {
+                               util.Debugf("receiver has no target: %s", in)
+                               break
+                       }
+                       in.SetPrefetch(true)
+                       in.SetCapacity(*credit) // Pre-fetch up to credit 
window.
+                       go b.receiver(in.Accept().(electron.Receiver))
+
+               default:
+                       in.Accept() // Accept sessions unconditionally
+               }
        }
+       util.Debugf("incoming closed: %s", c)
 }
 
 // sender pops messages from a queue and sends them.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 8573f28..3462b92 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -24,7 +24,6 @@ import "C"
 
 import (
        "net"
-       "qpid.apache.org/amqp"
        "qpid.apache.org/proton"
        "sync"
        "time"
@@ -58,12 +57,30 @@ type Connection interface {
 
        // WaitTimeout is like Wait but returns Timeout if the timeout expires.
        WaitTimeout(time.Duration) error
+
+       // Incoming returns a channel for incoming endpoints opened by the 
remote end.
+       //
+       // To enable, pass AllowIncoming() when creating the Connection. 
Otherwise all
+       // incoming endpoint requests are automatically rejected and Incoming()
+       // returns nil.
+       //
+       // An Incoming value can be an *IncomingSession, *IncomingSender or
+       // *IncomingReceiver.  You must call Accept() to open the endpoint or 
Reject()
+       // to close it with an error. The specific Incoming types have 
additional
+       // methods to configure the endpoint.
+       //
+       // Delay in receiving from Incoming() or calling Accept/Reject will 
block
+       // proton. Normally you should have a dedicated goroutine receive from 
this
+       // channel and start a new goroutine to serve each endpoint accepted.  
The
+       // channel is closed when the Connection closes.
+       //
+       Incoming() <-chan Incoming
 }
 
-// ConnectionOption can be passed when creating a connection.
+// ConnectionOption can be passed when creating a connection to configure 
various options
 type ConnectionOption func(*connection)
 
-// Server setting puts the connection in server mode.
+// Server returns a ConnectionOption to put the connection in server mode.
 //
 // A server connection will do protocol negotiation to accept a incoming AMQP
 // connection. Normally you would call this for a connection created by
@@ -71,24 +88,19 @@ type ConnectionOption func(*connection)
 //
 func Server() ConnectionOption { return func(c *connection) { 
c.engine.Server() } }
 
-// Accepter provides a function to be called when a connection receives an 
incoming
-// request to open an endpoint, one of IncomingSession, IncomingSender or 
IncomingReceiver.
-//
-// The accept() function must not block or use the accepted endpoint.
-// It can pass the endpoint to another goroutine for processing.
-//
-// By default all incoming endpoints are rejected.
-func Accepter(accept func(Incoming)) ConnectionOption {
-       return func(c *connection) { c.accept = accept }
+// AllowIncoming returns a ConnectionOption to enable incoming endpoint open 
requests.
+// See Connection.Incoming()
+func AllowIncoming() ConnectionOption {
+       return func(c *connection) { c.incoming = make(chan Incoming) }
 }
 
 type connection struct {
        endpoint
-       listenOnce, defaultSessionOnce, closeOnce sync.Once
+       defaultSessionOnce, closeOnce sync.Once
 
        container   *container
        conn        net.Conn
-       accept      func(Incoming)
+       incoming    chan Incoming
        handler     *handler
        engine      *proton.Engine
        err         proton.ErrorHolder
@@ -99,7 +111,7 @@ type connection struct {
 }
 
 func newConnection(conn net.Conn, cont *container, setting 
...ConnectionOption) (*connection, error) {
-       c := &connection{container: cont, conn: conn, accept: func(Incoming) 
{}, done: make(chan struct{})}
+       c := &connection{container: cont, conn: conn, done: make(chan struct{})}
        c.handler = newHandler(c)
        var err error
        c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
@@ -173,38 +185,45 @@ func (c *connection) WaitTimeout(timeout time.Duration) 
error {
        return c.Error()
 }
 
+func (c *connection) Incoming() <-chan Incoming { return c.incoming }
+
 // Incoming is the interface for incoming requests to open an endpoint.
 // Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
 type Incoming interface {
-       // Accept the endpoint with default settings.
-       //
-       // You must not use the returned endpoint in the accept() function that
-       // receives the Incoming value, but you can pass it to other goroutines.
-       //
-       // Implementing types provide type-specific Accept functions that take 
additional settings.
+       // Accept and open the endpoint.
        Accept() Endpoint
 
        // Reject the endpoint with an error
        Reject(error)
 
-       error(string, ...interface{}) error
+       // wait for and call the accept function, call in proton goroutine.
+       wait() error
+       pEndpoint() proton.Endpoint
 }
 
-// Common state for incoming endpoints, record accept or reject error.
 type incoming struct {
-       err      error
-       accepted bool
+       endpoint proton.Endpoint
+       acceptCh chan func() error
 }
 
-func (i *incoming) Reject(err error) { i.err = err }
+func makeIncoming(e proton.Endpoint) incoming {
+       return incoming{endpoint: e, acceptCh: make(chan func() error)}
+}
+
+func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return 
err } }
+
+// Call in proton goroutine, wait for and call the accept function fr
+func (in *incoming) wait() error { return (<-in.acceptCh)() }
+
+func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
 
-func (i *incoming) error(fmt string, arg ...interface{}) error {
-       switch {
-       case i.err != nil:
-               return i.err
-       case !i.accepted:
-               return amqp.Errorf(amqp.NotAllowed, fmt, arg...)
-       default:
+// Called in app goroutine to send an accept function to proton and return the 
resulting endpoint.
+func (in *incoming) accept(f func() Endpoint) Endpoint {
+       done := make(chan Endpoint)
+       in.acceptCh <- func() error {
+               ep := f()
+               done <- ep
                return nil
        }
+       return <-done
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 754a221..1b1164c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -49,9 +49,8 @@ func newHandler(c *connection) *handler {
        h.delegator.AutoOpen = false
        return h
 }
-
-func (h *handler) internalError(ep proton.Endpoint, msg string) {
-       proton.CloseError(ep, amqp.Errorf(amqp.InternalError, "%s %s", msg, ep))
+func (h *handler) linkError(l proton.Link, msg string) {
+       proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", 
msg, l.Type(), l))
 }
 
 func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e 
proton.Event) {
@@ -61,7 +60,7 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                if r, ok := h.links[e.Link()].(*receiver); ok {
                        r.message(e.Delivery())
                } else {
-                       h.internalError(e.Link(), "no receiver for link")
+                       h.linkError(e.Link(), "no receiver")
                }
 
        case proton.MSettled:
@@ -73,16 +72,12 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                if s, ok := h.links[e.Link()].(*sender); ok {
                        s.sendable()
                } else {
-                       h.internalError(e.Link(), "no sender for link")
+                       h.linkError(e.Link(), "no sender")
                }
 
        case proton.MSessionOpening:
                if e.Session().State().LocalUninit() { // Remotely opened
-                       incoming := &IncomingSession{h: h, pSession: 
e.Session()}
-                       h.connection.accept(incoming)
-                       if err := incoming.error("rejected session %s", 
e.Session()); err != nil {
-                               proton.CloseError(e.Session(), err)
-                       }
+                       h.incoming(newIncomingSession(h, e.Session()))
                }
 
        case proton.MSessionClosed:
@@ -101,19 +96,13 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                }
                ss := h.sessions[l.Session()]
                if ss == nil {
-                       h.internalError(e.Link(), "no session for link")
+                       h.linkError(e.Link(), "no session")
                        break
                }
-               var incoming Incoming
                if l.IsReceiver() {
-                       incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
+                       h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
                } else {
-                       incoming = &IncomingSender{makeIncomingLink(ss, l)}
-               }
-               h.connection.accept(incoming)
-               if err := incoming.error("rejected link %s", e.Link()); err != 
nil {
-                       proton.CloseError(l, err)
-                       break
+                       h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
                }
 
        case proton.MLinkClosing:
@@ -146,6 +135,22 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
        }
 }
 
+func (h *handler) incoming(in Incoming) {
+       var err error
+       if h.connection.incoming != nil {
+               h.connection.incoming <- in
+               err = in.wait()
+       } else {
+               err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
+                       in.pEndpoint().Type(), in.pEndpoint().String())
+       }
+       if err == nil {
+               in.pEndpoint().Open()
+       } else {
+               proton.CloseError(in.pEndpoint(), err)
+       }
+}
+
 func (h *handler) linkClosed(l proton.Link, err error) {
        if link := h.links[l]; link != nil {
                link.closed(err)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 4835cb9..cadc2c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -60,36 +60,37 @@ type Link interface {
        open()
 }
 
-// LinkOption can be passed when creating a sender or receiver link.
+// LinkOption can be passed when creating a sender or receiver link to set 
optional configuration.
 type LinkOption func(*link)
 
-// Source sets address that messages are coming from.
+// Source returns a LinkOption that sets address that messages are coming from.
 func Source(s string) LinkOption { return func(l *link) { l.source = s } }
 
-// Target sets address that messages are going to.
+// Target returns a LinkOption that sets address that messages are going to.
 func Target(s string) LinkOption { return func(l *link) { l.target = s } }
 
-// LinkName sets the link name.
+// LinkName returns a LinkOption that sets the link name.
 func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
 
-// SndSettle sets the send settle mode
+// SndSettle returns a LinkOption that sets the send settle mode
 func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { 
l.sndSettle = m } }
 
-// RcvSettle sets the send settle mode
+// RcvSettle returns a LinkOption that sets the send settle mode
 func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { 
l.rcvSettle = m } }
 
-// SndSettleMode defines when the sending end of the link settles message 
delivery.
+// SndSettleMode returns a LinkOption that defines when the sending end of the
+// link settles message delivery.
 type SndSettleMode proton.SndSettleMode
 
-// Capacity sets the link capacity
+// Capacity returns a LinkOption that sets the link capacity
 func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
 
-// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
+// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not 
relevant for a sender.
 func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
 
-// AtMostOnce sets "fire and forget" mode, messages are sent but no
-// acknowledgment is received, messages can be lost if there is a network
-// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
+// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
+// are sent but no acknowledgment is received, messages can be lost if there is
+// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
 func AtMostOnce() LinkOption {
        return func(l *link) {
                SndSettle(SndSettled)(l)
@@ -97,11 +98,11 @@ func AtMostOnce() LinkOption {
        }
 }
 
-// AtLeastOnce requests acknowledgment for every message, acknowledgment
-// indicates the message was definitely received. In the event of a
-// failure, unacknowledged messages can be re-sent but there is a chance
-// that the message will be received twice in this case.
-// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
+// AtLeastOnce returns a LinkOption that requests acknowledgment for every
+// message, acknowledgment indicates the message was definitely received. In 
the
+// event of a failure, unacknowledged messages can be re-sent but there is a
+// chance that the message will be received twice in this case.  Sets
+// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
 func AtLeastOnce() LinkOption {
        return func(l *link) {
                SndSettle(SndUnsettled)(l)
@@ -200,6 +201,7 @@ type incomingLink struct {
 // Set up a link from an incoming proton.Link.
 func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
        l := incomingLink{
+               incoming: makeIncoming(eLink),
                link: link{
                        session:   sn,
                        isSender:  eLink.IsSender(),

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index c7ff290..3b5a062 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -39,22 +39,22 @@ func fatalIf(t *testing.T, err error) {
        }
 }
 
-// Start a server, return listening addr and channel for incoming Connection.
-func newServer(t *testing.T, cont Container, accept func(Incoming)) (net.Addr, 
<-chan Connection) {
+// Start a server, return listening addr and channel for incoming Connections.
+func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) {
        listener, err := net.Listen("tcp", "")
        fatalIf(t, err)
        addr := listener.Addr()
        ch := make(chan Connection)
        go func() {
                conn, err := listener.Accept()
-               c, err := cont.Connection(conn, Server(), Accepter(accept))
+               c, err := cont.Connection(conn, Server(), AllowIncoming())
                fatalIf(t, err)
                ch <- c
        }()
        return addr, ch
 }
 
-// Return open an client connection and session, return the session.
+// Open a client connection and session, return the session.
 func newClient(t *testing.T, cont Container, addr net.Addr) Session {
        conn, err := net.Dial(addr.Network(), addr.String())
        fatalIf(t, err)
@@ -66,8 +66,8 @@ func newClient(t *testing.T, cont Container, addr net.Addr) 
Session {
 }
 
 // Return client and server ends of the same connection.
-func newClientServer(t *testing.T, accept func(Incoming)) (client Session, 
server Connection) {
-       addr, ch := newServer(t, NewContainer("test-server"), accept)
+func newClientServer(t *testing.T) (client Session, server Connection) {
+       addr, ch := newServer(t, NewContainer("test-server"))
        client = newClient(t, NewContainer("test-client"), addr)
        return client, <-ch
 }
@@ -85,19 +85,22 @@ func TestClientSendServerReceive(t *testing.T) {
        nMessages := 3
 
        rchan := make(chan Receiver, nLinks)
-       client, server := newClientServer(t, func(i Incoming) {
-               switch i := i.(type) {
-               case *IncomingReceiver:
-                       rchan <- i.AcceptReceiver(1, false)
-               default:
-                       i.Accept()
+       client, server := newClientServer(t)
+       go func() {
+               for in := range server.Incoming() {
+                       switch in := in.(type) {
+                       case *IncomingReceiver:
+                               in.SetCapacity(1)
+                               in.SetPrefetch(false)
+                               rchan <- in.Accept().(Receiver)
+                       default:
+                               in.Accept()
+                       }
                }
-       })
-
-       defer func() {
-               closeClientServer(client, server)
        }()
 
+       defer func() { closeClientServer(client, server) }()
+
        s := make([]Sender, nLinks)
        for i := 0; i < nLinks; i++ {
                var err error
@@ -155,26 +158,29 @@ func TestClientSendServerReceive(t *testing.T) {
 
 func TestClientReceiver(t *testing.T) {
        nMessages := 3
-       client, server := newClientServer(t, func(i Incoming) {
-               switch i := i.(type) {
-               case *IncomingSender:
-                       s := i.AcceptSender()
-                       go func() {
-                               for i := int32(0); i < int32(nMessages); i++ {
-                                       sm, err := 
s.Send(amqp.NewMessageWith(i))
-                                       if err != nil {
-                                               t.Error(err)
-                                               return
-                                       } else {
-                                               sm.Disposition() // Sync send.
+       client, server := newClientServer(t)
+       go func() {
+               for in := range server.Incoming() {
+                       switch in := in.(type) {
+                       case *IncomingSender:
+                               s := in.Accept().(Sender)
+                               go func() {
+                                       for i := int32(0); i < 
int32(nMessages); i++ {
+                                               sm, err := 
s.Send(amqp.NewMessageWith(i))
+                                               if err != nil {
+                                                       t.Error(err)
+                                                       return
+                                               } else {
+                                                       sm.Disposition() // 
Sync send.
+                                               }
                                        }
-                               }
-                               s.Close(nil)
-                       }()
-               default:
-                       i.Accept()
+                                       s.Close(nil)
+                               }()
+                       default:
+                               in.Accept()
+                       }
                }
-       })
+       }()
 
        r, err := client.Receiver(Source("foo"))
        if err != nil {
@@ -203,14 +209,19 @@ func TestClientReceiver(t *testing.T) {
 func TestTimeouts(t *testing.T) {
        var err error
        rchan := make(chan Receiver, 1)
-       client, server := newClientServer(t, func(i Incoming) {
-               switch i := i.(type) {
-               case *IncomingReceiver:
-                       rchan <- i.AcceptReceiver(1, false) // Issue credit 
only on receive
-               default:
-                       i.Accept()
+       client, server := newClientServer(t)
+       go func() {
+               for i := range server.Incoming() {
+                       switch i := i.(type) {
+                       case *IncomingReceiver:
+                               i.SetCapacity(1)
+                               i.SetPrefetch(false)
+                               rchan <- i.Accept().(Receiver) // Issue credit 
only on receive
+                       default:
+                               i.Accept()
+                       }
                }
-       })
+       }()
        defer func() { closeClientServer(client, server) }()
 
        // Open client sender
@@ -274,16 +285,21 @@ type pairs struct {
 
 func newPairs(t *testing.T) *pairs {
        p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan 
Sender, 1)}
-       p.client, p.server = newClientServer(t, func(i Incoming) {
-               switch i := i.(type) {
-               case *IncomingReceiver:
-                       p.rchan <- i.AcceptReceiver(1, false)
-               case *IncomingSender:
-                       p.schan <- i.AcceptSender()
-               default:
-                       i.Accept()
+       p.client, p.server = newClientServer(t)
+       go func() {
+               for i := range p.server.Incoming() {
+                       switch i := i.(type) {
+                       case *IncomingReceiver:
+                               i.SetCapacity(1)
+                               i.SetPrefetch(false)
+                               p.rchan <- i.Accept().(Receiver)
+                       case *IncomingSender:
+                               p.schan <- i.Accept().(Sender)
+                       default:
+                               i.Accept()
+                       }
                }
-       })
+       }()
        return p
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 9f854cf..4ff83b4 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -119,6 +119,7 @@ type receiver struct {
        policy policy
 }
 
+// Call in proton goroutine
 func newReceiver(l link) *receiver {
        r := &receiver{link: l}
        if r.capacity < 1 {
@@ -216,23 +217,19 @@ func (rm *ReceivedMessage) Accept() error { return 
rm.Acknowledge(Accepted) }
 // Reject is short for Acknowledge(Rejected)
 func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
 
-// IncomingReceiver is passed to the accept() function given to 
Connection.Listen()
-// when there is an incoming request for a receiver link.
+// IncomingReceiver is sent on the Connection.Incoming() channel when there is
+// an incoming request to open a receiver link.
 type IncomingReceiver struct {
        incomingLink
 }
 
-// Link provides information about the incoming link.
-func (i *IncomingReceiver) Link() Link { return i }
+// SetCapacity sets the capacity of the incoming receiver, call before Accept()
+func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity 
}
 
-// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
-func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) 
Receiver {
-       i.capacity = capacity
-       i.prefetch = prefetch
-       return i.Accept().(Receiver)
-}
+// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before 
Accept()
+func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = 
prefetch }
 
-func (i *IncomingReceiver) Accept() Endpoint {
-       i.accepted = true
-       return newReceiver(i.link)
+// Accept accepts an incoming receiver endpoint
+func (in *IncomingReceiver) Accept() Endpoint {
+       return in.accept(func() Endpoint { return newReceiver(in.link) })
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 98304c1..11f019b 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -297,18 +297,13 @@ func (sm *sentMessage) finish() {
 
 func (sm *sentMessage) Error() error { return sm.err }
 
-// IncomingSender is passed to the accept() function given to 
Connection.Listen()
-// when there is an incoming request for a sender link.
+// IncomingSender is sent on the Connection.Incoming() channel when there is
+// an incoming request to open a sender link.
 type IncomingSender struct {
        incomingLink
 }
 
-// Link provides information about the incoming link.
-func (i *IncomingSender) Link() Link { return i }
-
-func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
-
-func (i *IncomingSender) Accept() Endpoint {
-       i.accepted = true
-       return newSender(i.link)
+// Accept accepts an incoming sender endpoint
+func (in *IncomingSender) Accept() Endpoint {
+       return in.accept(func() Endpoint { return newSender(in.link) })
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 785a582..d347c99 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -44,7 +44,8 @@ type session struct {
 // SessionOption can be passed when creating a Session
 type SessionOption func(*session)
 
-// IncomingCapacity sets the size (in bytes) of the sessions incoming data 
buffer..
+// IncomingCapacity returns a Session Option that sets the size (in bytes) of
+// the sessions incoming data buffer..
 func IncomingCapacity(cap uint) SessionOption { return func(s *session) { 
s.capacity = cap } }
 
 // in proton goroutine
@@ -70,8 +71,6 @@ func (s *session) Close(err error) {
        s.engine().Inject(func() { localClose(s.eSession, err) })
 }
 
-func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
-
 func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
        err = s.engine().InjectWait(func() error {
                l, err := localLink(s, true, setting...)
@@ -100,8 +99,8 @@ func (s *session) closed(err error) {
        s.err.Set(Closed)
 }
 
-// IncomingSession is passed to the accept() function given to 
Connection.Listen()
-// when there is an incoming session request.
+// IncomingSender is sent on the Connection.Incoming() channel when there is an
+// incoming request to open a session.
 type IncomingSession struct {
        incoming
        h        *handler
@@ -109,13 +108,16 @@ type IncomingSession struct {
        capacity uint
 }
 
-// AcceptCapacity sets the session buffer capacity of an incoming session in 
bytes.
-func (i *IncomingSession) AcceptSession(bytes uint) Session {
-       i.capacity = bytes
-       return i.Accept().(Session)
+func newIncomingSession(h *handler, ps proton.Session) *IncomingSession {
+       return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps}
 }
 
-func (i *IncomingSession) Accept() Endpoint {
-       i.accepted = true
-       return newSession(i.h.connection, i.pSession, 
IncomingCapacity(i.capacity))
+// SetCapacity sets the session buffer capacity of an incoming session in 
bytes.
+func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes }
+
+// Accept an incoming session endpoint.
+func (in *IncomingSession) Accept() Endpoint {
+       return in.accept(func() Endpoint {
+               return newSession(in.h.connection, in.pSession, 
IncomingCapacity(in.capacity))
+       })
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 1a1fd96..0b881c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -168,6 +168,8 @@ type Endpoint interface {
        RemoteCondition() Condition
        // Human readable name
        String() string
+       // Human readable endpoint type "link", "session" etc.
+       Type() string
 }
 
 // CloseError sets an error condition on an endpoint and closes the endpoint
@@ -257,6 +259,15 @@ func (l Link) String() string {
        }
 }
 
+func (l Link) Type() string {
+       if l.IsSender() {
+               return "sender-link"
+       } else {
+               return "receiver-link"
+       }
+
+}
+
 func cPtr(b []byte) *C.char {
        if len(b) == 0 {
                return nil
@@ -285,6 +296,10 @@ func (c Connection) String() string {
        return fmt.Sprintf("%x", c.pn)
 }
 
+func (c Connection) Type() string {
+       return "connection"
+}
+
 // Head functions don't follow the normal naming conventions so missed by the 
generator.
 
 func (c Connection) LinkHead(s State) Link {
@@ -313,6 +328,8 @@ func (s Session) String() string {
        return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
 }
 
+func (s Session) Type() string { return "session" }
+
 // Error returns an instance of amqp.Error or nil.
 func (c Condition) Error() error {
        if c.IsNil() || !c.IsSet() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to