PROTON-1045: Use of callbacks to handle accepted endpoints violates design goals.
Get rid of use of callback functions that run in proton goroutine. Use a channel instead for Incoming requests, all user code runs in user goroutines. Safer and more consistent. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/91fe6e0f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/91fe6e0f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/91fe6e0f Branch: refs/heads/go1 Commit: 91fe6e0fda07efa07f10a97d906eee114c86d6bf Parents: f3edf57 Author: Alan Conway <[email protected]> Authored: Wed Nov 11 10:22:01 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Wed Nov 11 21:54:25 2015 -0500 ---------------------------------------------------------------------- examples/go/electron/broker.go | 35 ++++-- .../src/qpid.apache.org/electron/connection.go | 85 ++++++++------ .../go/src/qpid.apache.org/electron/handler.go | 43 ++++--- .../go/src/qpid.apache.org/electron/link.go | 36 +++--- .../qpid.apache.org/electron/messaging_test.go | 116 +++++++++++-------- .../go/src/qpid.apache.org/electron/receiver.go | 23 ++-- .../go/src/qpid.apache.org/electron/sender.go | 15 +-- .../go/src/qpid.apache.org/electron/session.go | 26 +++-- .../go/src/qpid.apache.org/proton/wrappers.go | 17 +++ 9 files changed, 233 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/examples/go/electron/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go index f1dce17..f68deb1 100644 --- a/examples/go/electron/broker.go +++ b/examples/go/electron/broker.go @@ -78,26 +78,43 @@ func (b *broker) run() error { util.Debugf("Accept error: %v", err) continue } - c, err := b.container.Connection(conn, electron.Server(), electron.Accepter(b.accept)) + c, err := b.container.Connection(conn, electron.Server(), electron.AllowIncoming()) if err != nil { util.Debugf("Connection error: %v", err) continue } + go b.accept(c) // Goroutine to accept incoming sessions and links. util.Debugf("Accepted %v", c) } } // accept remotely-opened endpoints (Session, Sender and Receiver) // and start goroutines to service them. -func (b *broker) accept(i electron.Incoming) { - switch i := i.(type) { - case *electron.IncomingSender: - go b.sender(i.AcceptSender()) - case *electron.IncomingReceiver: - go b.receiver(i.AcceptReceiver(100, true)) // Pre-fetch 100 messages - default: - i.Accept() +func (b *broker) accept(c electron.Connection) { + for in := range c.Incoming() { + switch in := in.(type) { + + case *electron.IncomingSender: + if in.Source() == "" { + util.Debugf("sender has no source: %s", in) + break + } + go b.sender(in.Accept().(electron.Sender)) + + case *electron.IncomingReceiver: + if in.Target() == "" { + util.Debugf("receiver has no target: %s", in) + break + } + in.SetPrefetch(true) + in.SetCapacity(*credit) // Pre-fetch up to credit window. + go b.receiver(in.Accept().(electron.Receiver)) + + default: + in.Accept() // Accept sessions unconditionally + } } + util.Debugf("incoming closed: %s", c) } // sender pops messages from a queue and sends them. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go index 8573f28..3462b92 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go @@ -24,7 +24,6 @@ import "C" import ( "net" - "qpid.apache.org/amqp" "qpid.apache.org/proton" "sync" "time" @@ -58,12 +57,30 @@ type Connection interface { // WaitTimeout is like Wait but returns Timeout if the timeout expires. WaitTimeout(time.Duration) error + + // Incoming returns a channel for incoming endpoints opened by the remote end. + // + // To enable, pass AllowIncoming() when creating the Connection. Otherwise all + // incoming endpoint requests are automatically rejected and Incoming() + // returns nil. + // + // An Incoming value can be an *IncomingSession, *IncomingSender or + // *IncomingReceiver. You must call Accept() to open the endpoint or Reject() + // to close it with an error. The specific Incoming types have additional + // methods to configure the endpoint. + // + // Delay in receiving from Incoming() or calling Accept/Reject will block + // proton. Normally you should have a dedicated goroutine receive from this + // channel and start a new goroutine to serve each endpoint accepted. The + // channel is closed when the Connection closes. + // + Incoming() <-chan Incoming } -// ConnectionOption can be passed when creating a connection. +// ConnectionOption can be passed when creating a connection to configure various options type ConnectionOption func(*connection) -// Server setting puts the connection in server mode. +// Server returns a ConnectionOption to put the connection in server mode. // // A server connection will do protocol negotiation to accept a incoming AMQP // connection. Normally you would call this for a connection created by @@ -71,24 +88,19 @@ type ConnectionOption func(*connection) // func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } } -// Accepter provides a function to be called when a connection receives an incoming -// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver. -// -// The accept() function must not block or use the accepted endpoint. -// It can pass the endpoint to another goroutine for processing. -// -// By default all incoming endpoints are rejected. -func Accepter(accept func(Incoming)) ConnectionOption { - return func(c *connection) { c.accept = accept } +// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests. +// See Connection.Incoming() +func AllowIncoming() ConnectionOption { + return func(c *connection) { c.incoming = make(chan Incoming) } } type connection struct { endpoint - listenOnce, defaultSessionOnce, closeOnce sync.Once + defaultSessionOnce, closeOnce sync.Once container *container conn net.Conn - accept func(Incoming) + incoming chan Incoming handler *handler engine *proton.Engine err proton.ErrorHolder @@ -99,7 +111,7 @@ type connection struct { } func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) { - c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})} + c := &connection{container: cont, conn: conn, done: make(chan struct{})} c.handler = newHandler(c) var err error c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) @@ -173,38 +185,45 @@ func (c *connection) WaitTimeout(timeout time.Duration) error { return c.Error() } +func (c *connection) Incoming() <-chan Incoming { return c.incoming } + // Incoming is the interface for incoming requests to open an endpoint. // Implementing types are IncomingSession, IncomingSender and IncomingReceiver. type Incoming interface { - // Accept the endpoint with default settings. - // - // You must not use the returned endpoint in the accept() function that - // receives the Incoming value, but you can pass it to other goroutines. - // - // Implementing types provide type-specific Accept functions that take additional settings. + // Accept and open the endpoint. Accept() Endpoint // Reject the endpoint with an error Reject(error) - error(string, ...interface{}) error + // wait for and call the accept function, call in proton goroutine. + wait() error + pEndpoint() proton.Endpoint } -// Common state for incoming endpoints, record accept or reject error. type incoming struct { - err error - accepted bool + endpoint proton.Endpoint + acceptCh chan func() error } -func (i *incoming) Reject(err error) { i.err = err } +func makeIncoming(e proton.Endpoint) incoming { + return incoming{endpoint: e, acceptCh: make(chan func() error)} +} + +func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } } + +// Call in proton goroutine, wait for and call the accept function fr +func (in *incoming) wait() error { return (<-in.acceptCh)() } + +func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint } -func (i *incoming) error(fmt string, arg ...interface{}) error { - switch { - case i.err != nil: - return i.err - case !i.accepted: - return amqp.Errorf(amqp.NotAllowed, fmt, arg...) - default: +// Called in app goroutine to send an accept function to proton and return the resulting endpoint. +func (in *incoming) accept(f func() Endpoint) Endpoint { + done := make(chan Endpoint) + in.acceptCh <- func() error { + ep := f() + done <- ep return nil } + return <-done } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go index 754a221..1b1164c 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go @@ -49,9 +49,8 @@ func newHandler(c *connection) *handler { h.delegator.AutoOpen = false return h } - -func (h *handler) internalError(ep proton.Endpoint, msg string) { - proton.CloseError(ep, amqp.Errorf(amqp.InternalError, "%s %s", msg, ep)) +func (h *handler) linkError(l proton.Link, msg string) { + proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l)) } func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { @@ -61,7 +60,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) if r, ok := h.links[e.Link()].(*receiver); ok { r.message(e.Delivery()) } else { - h.internalError(e.Link(), "no receiver for link") + h.linkError(e.Link(), "no receiver") } case proton.MSettled: @@ -73,16 +72,12 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) if s, ok := h.links[e.Link()].(*sender); ok { s.sendable() } else { - h.internalError(e.Link(), "no sender for link") + h.linkError(e.Link(), "no sender") } case proton.MSessionOpening: if e.Session().State().LocalUninit() { // Remotely opened - incoming := &IncomingSession{h: h, pSession: e.Session()} - h.connection.accept(incoming) - if err := incoming.error("rejected session %s", e.Session()); err != nil { - proton.CloseError(e.Session(), err) - } + h.incoming(newIncomingSession(h, e.Session())) } case proton.MSessionClosed: @@ -101,19 +96,13 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) } ss := h.sessions[l.Session()] if ss == nil { - h.internalError(e.Link(), "no session for link") + h.linkError(e.Link(), "no session") break } - var incoming Incoming if l.IsReceiver() { - incoming = &IncomingReceiver{makeIncomingLink(ss, l)} + h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)}) } else { - incoming = &IncomingSender{makeIncomingLink(ss, l)} - } - h.connection.accept(incoming) - if err := incoming.error("rejected link %s", e.Link()); err != nil { - proton.CloseError(l, err) - break + h.incoming(&IncomingSender{makeIncomingLink(ss, l)}) } case proton.MLinkClosing: @@ -146,6 +135,22 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) } } +func (h *handler) incoming(in Incoming) { + var err error + if h.connection.incoming != nil { + h.connection.incoming <- in + err = in.wait() + } else { + err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s", + in.pEndpoint().Type(), in.pEndpoint().String()) + } + if err == nil { + in.pEndpoint().Open() + } else { + proton.CloseError(in.pEndpoint(), err) + } +} + func (h *handler) linkClosed(l proton.Link, err error) { if link := h.links[l]; link != nil { link.closed(err) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go index 4835cb9..cadc2c1 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go @@ -60,36 +60,37 @@ type Link interface { open() } -// LinkOption can be passed when creating a sender or receiver link. +// LinkOption can be passed when creating a sender or receiver link to set optional configuration. type LinkOption func(*link) -// Source sets address that messages are coming from. +// Source returns a LinkOption that sets address that messages are coming from. func Source(s string) LinkOption { return func(l *link) { l.source = s } } -// Target sets address that messages are going to. +// Target returns a LinkOption that sets address that messages are going to. func Target(s string) LinkOption { return func(l *link) { l.target = s } } -// LinkName sets the link name. +// LinkName returns a LinkOption that sets the link name. func LinkName(s string) LinkOption { return func(l *link) { l.target = s } } -// SndSettle sets the send settle mode +// SndSettle returns a LinkOption that sets the send settle mode func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } } -// RcvSettle sets the send settle mode +// RcvSettle returns a LinkOption that sets the send settle mode func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } } -// SndSettleMode defines when the sending end of the link settles message delivery. +// SndSettleMode returns a LinkOption that defines when the sending end of the +// link settles message delivery. type SndSettleMode proton.SndSettleMode -// Capacity sets the link capacity +// Capacity returns a LinkOption that sets the link capacity func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } } -// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender. +// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender. func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } } -// AtMostOnce sets "fire and forget" mode, messages are sent but no -// acknowledgment is received, messages can be lost if there is a network -// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst +// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages +// are sent but no acknowledgment is received, messages can be lost if there is +// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst func AtMostOnce() LinkOption { return func(l *link) { SndSettle(SndSettled)(l) @@ -97,11 +98,11 @@ func AtMostOnce() LinkOption { } } -// AtLeastOnce requests acknowledgment for every message, acknowledgment -// indicates the message was definitely received. In the event of a -// failure, unacknowledged messages can be re-sent but there is a chance -// that the message will be received twice in this case. -// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst +// AtLeastOnce returns a LinkOption that requests acknowledgment for every +// message, acknowledgment indicates the message was definitely received. In the +// event of a failure, unacknowledged messages can be re-sent but there is a +// chance that the message will be received twice in this case. Sets +// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst func AtLeastOnce() LinkOption { return func(l *link) { SndSettle(SndUnsettled)(l) @@ -200,6 +201,7 @@ type incomingLink struct { // Set up a link from an incoming proton.Link. func makeIncomingLink(sn *session, eLink proton.Link) incomingLink { l := incomingLink{ + incoming: makeIncoming(eLink), link: link{ session: sn, isSender: eLink.IsSender(), http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go index c7ff290..3b5a062 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go @@ -39,22 +39,22 @@ func fatalIf(t *testing.T, err error) { } } -// Start a server, return listening addr and channel for incoming Connection. -func newServer(t *testing.T, cont Container, accept func(Incoming)) (net.Addr, <-chan Connection) { +// Start a server, return listening addr and channel for incoming Connections. +func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) { listener, err := net.Listen("tcp", "") fatalIf(t, err) addr := listener.Addr() ch := make(chan Connection) go func() { conn, err := listener.Accept() - c, err := cont.Connection(conn, Server(), Accepter(accept)) + c, err := cont.Connection(conn, Server(), AllowIncoming()) fatalIf(t, err) ch <- c }() return addr, ch } -// Return open an client connection and session, return the session. +// Open a client connection and session, return the session. func newClient(t *testing.T, cont Container, addr net.Addr) Session { conn, err := net.Dial(addr.Network(), addr.String()) fatalIf(t, err) @@ -66,8 +66,8 @@ func newClient(t *testing.T, cont Container, addr net.Addr) Session { } // Return client and server ends of the same connection. -func newClientServer(t *testing.T, accept func(Incoming)) (client Session, server Connection) { - addr, ch := newServer(t, NewContainer("test-server"), accept) +func newClientServer(t *testing.T) (client Session, server Connection) { + addr, ch := newServer(t, NewContainer("test-server")) client = newClient(t, NewContainer("test-client"), addr) return client, <-ch } @@ -85,19 +85,22 @@ func TestClientSendServerReceive(t *testing.T) { nMessages := 3 rchan := make(chan Receiver, nLinks) - client, server := newClientServer(t, func(i Incoming) { - switch i := i.(type) { - case *IncomingReceiver: - rchan <- i.AcceptReceiver(1, false) - default: - i.Accept() + client, server := newClientServer(t) + go func() { + for in := range server.Incoming() { + switch in := in.(type) { + case *IncomingReceiver: + in.SetCapacity(1) + in.SetPrefetch(false) + rchan <- in.Accept().(Receiver) + default: + in.Accept() + } } - }) - - defer func() { - closeClientServer(client, server) }() + defer func() { closeClientServer(client, server) }() + s := make([]Sender, nLinks) for i := 0; i < nLinks; i++ { var err error @@ -155,26 +158,29 @@ func TestClientSendServerReceive(t *testing.T) { func TestClientReceiver(t *testing.T) { nMessages := 3 - client, server := newClientServer(t, func(i Incoming) { - switch i := i.(type) { - case *IncomingSender: - s := i.AcceptSender() - go func() { - for i := int32(0); i < int32(nMessages); i++ { - sm, err := s.Send(amqp.NewMessageWith(i)) - if err != nil { - t.Error(err) - return - } else { - sm.Disposition() // Sync send. + client, server := newClientServer(t) + go func() { + for in := range server.Incoming() { + switch in := in.(type) { + case *IncomingSender: + s := in.Accept().(Sender) + go func() { + for i := int32(0); i < int32(nMessages); i++ { + sm, err := s.Send(amqp.NewMessageWith(i)) + if err != nil { + t.Error(err) + return + } else { + sm.Disposition() // Sync send. + } } - } - s.Close(nil) - }() - default: - i.Accept() + s.Close(nil) + }() + default: + in.Accept() + } } - }) + }() r, err := client.Receiver(Source("foo")) if err != nil { @@ -203,14 +209,19 @@ func TestClientReceiver(t *testing.T) { func TestTimeouts(t *testing.T) { var err error rchan := make(chan Receiver, 1) - client, server := newClientServer(t, func(i Incoming) { - switch i := i.(type) { - case *IncomingReceiver: - rchan <- i.AcceptReceiver(1, false) // Issue credit only on receive - default: - i.Accept() + client, server := newClientServer(t) + go func() { + for i := range server.Incoming() { + switch i := i.(type) { + case *IncomingReceiver: + i.SetCapacity(1) + i.SetPrefetch(false) + rchan <- i.Accept().(Receiver) // Issue credit only on receive + default: + i.Accept() + } } - }) + }() defer func() { closeClientServer(client, server) }() // Open client sender @@ -274,16 +285,21 @@ type pairs struct { func newPairs(t *testing.T) *pairs { p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)} - p.client, p.server = newClientServer(t, func(i Incoming) { - switch i := i.(type) { - case *IncomingReceiver: - p.rchan <- i.AcceptReceiver(1, false) - case *IncomingSender: - p.schan <- i.AcceptSender() - default: - i.Accept() + p.client, p.server = newClientServer(t) + go func() { + for i := range p.server.Incoming() { + switch i := i.(type) { + case *IncomingReceiver: + i.SetCapacity(1) + i.SetPrefetch(false) + p.rchan <- i.Accept().(Receiver) + case *IncomingSender: + p.schan <- i.Accept().(Sender) + default: + i.Accept() + } } - }) + }() return p } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go index 9f854cf..4ff83b4 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go @@ -119,6 +119,7 @@ type receiver struct { policy policy } +// Call in proton goroutine func newReceiver(l link) *receiver { r := &receiver{link: l} if r.capacity < 1 { @@ -216,23 +217,19 @@ func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) } // Reject is short for Acknowledge(Rejected) func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) } -// IncomingReceiver is passed to the accept() function given to Connection.Listen() -// when there is an incoming request for a receiver link. +// IncomingReceiver is sent on the Connection.Incoming() channel when there is +// an incoming request to open a receiver link. type IncomingReceiver struct { incomingLink } -// Link provides information about the incoming link. -func (i *IncomingReceiver) Link() Link { return i } +// SetCapacity sets the capacity of the incoming receiver, call before Accept() +func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity } -// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver. -func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver { - i.capacity = capacity - i.prefetch = prefetch - return i.Accept().(Receiver) -} +// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept() +func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch } -func (i *IncomingReceiver) Accept() Endpoint { - i.accepted = true - return newReceiver(i.link) +// Accept accepts an incoming receiver endpoint +func (in *IncomingReceiver) Accept() Endpoint { + return in.accept(func() Endpoint { return newReceiver(in.link) }) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go index 98304c1..11f019b 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go @@ -297,18 +297,13 @@ func (sm *sentMessage) finish() { func (sm *sentMessage) Error() error { return sm.err } -// IncomingSender is passed to the accept() function given to Connection.Listen() -// when there is an incoming request for a sender link. +// IncomingSender is sent on the Connection.Incoming() channel when there is +// an incoming request to open a sender link. type IncomingSender struct { incomingLink } -// Link provides information about the incoming link. -func (i *IncomingSender) Link() Link { return i } - -func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) } - -func (i *IncomingSender) Accept() Endpoint { - i.accepted = true - return newSender(i.link) +// Accept accepts an incoming sender endpoint +func (in *IncomingSender) Accept() Endpoint { + return in.accept(func() Endpoint { return newSender(in.link) }) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go index 785a582..d347c99 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go +++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go @@ -44,7 +44,8 @@ type session struct { // SessionOption can be passed when creating a Session type SessionOption func(*session) -// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer.. +// IncomingCapacity returns a Session Option that sets the size (in bytes) of +// the sessions incoming data buffer.. func IncomingCapacity(cap uint) SessionOption { return func(s *session) { s.capacity = cap } } // in proton goroutine @@ -70,8 +71,6 @@ func (s *session) Close(err error) { s.engine().Inject(func() { localClose(s.eSession, err) }) } -func (s *session) SetCapacity(bytes uint) { s.capacity = bytes } - func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) { err = s.engine().InjectWait(func() error { l, err := localLink(s, true, setting...) @@ -100,8 +99,8 @@ func (s *session) closed(err error) { s.err.Set(Closed) } -// IncomingSession is passed to the accept() function given to Connection.Listen() -// when there is an incoming session request. +// IncomingSender is sent on the Connection.Incoming() channel when there is an +// incoming request to open a session. type IncomingSession struct { incoming h *handler @@ -109,13 +108,16 @@ type IncomingSession struct { capacity uint } -// AcceptCapacity sets the session buffer capacity of an incoming session in bytes. -func (i *IncomingSession) AcceptSession(bytes uint) Session { - i.capacity = bytes - return i.Accept().(Session) +func newIncomingSession(h *handler, ps proton.Session) *IncomingSession { + return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps} } -func (i *IncomingSession) Accept() Endpoint { - i.accepted = true - return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity)) +// SetCapacity sets the session buffer capacity of an incoming session in bytes. +func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes } + +// Accept an incoming session endpoint. +func (in *IncomingSession) Accept() Endpoint { + return in.accept(func() Endpoint { + return newSession(in.h.connection, in.pSession, IncomingCapacity(in.capacity)) + }) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/91fe6e0f/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go index 1a1fd96..0b881c1 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go @@ -168,6 +168,8 @@ type Endpoint interface { RemoteCondition() Condition // Human readable name String() string + // Human readable endpoint type "link", "session" etc. + Type() string } // CloseError sets an error condition on an endpoint and closes the endpoint @@ -257,6 +259,15 @@ func (l Link) String() string { } } +func (l Link) Type() string { + if l.IsSender() { + return "sender-link" + } else { + return "receiver-link" + } + +} + func cPtr(b []byte) *C.char { if len(b) == 0 { return nil @@ -285,6 +296,10 @@ func (c Connection) String() string { return fmt.Sprintf("%x", c.pn) } +func (c Connection) Type() string { + return "connection" +} + // Head functions don't follow the normal naming conventions so missed by the generator. func (c Connection) LinkHead(s State) Link { @@ -313,6 +328,8 @@ func (s Session) String() string { return fmt.Sprintf("%s/%p", s.Connection(), s.pn) } +func (s Session) Type() string { return "session" } + // Error returns an instance of amqp.Error or nil. func (c Condition) Error() error { if c.IsNil() || !c.IsSet() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
