Repository: qpid-proton Updated Branches: refs/heads/master 3a48863c1 -> 43c5cff32
PROTON-827: Doc updates, minor API cleanup. Added DefaultSession, Sender()/Receiver() on Connection. Improved documentation and package README Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/43c5cff3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/43c5cff3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/43c5cff3 Branch: refs/heads/master Commit: 43c5cff3274f4d4e9150202255ee9cafea83ed47 Parents: 3a48863 Author: Alan Conway <[email protected]> Authored: Tue Sep 29 16:07:27 2015 -0400 Committer: Alan Conway <[email protected]> Committed: Tue Sep 29 16:12:14 2015 -0400 ---------------------------------------------------------------------- examples/go/broker.go | 2 +- examples/go/receive.go | 10 +--- examples/go/send.go | 10 +--- .../go/src/qpid.apache.org/proton/README.md | 12 ++++ .../proton/concurrent/connection.go | 62 +++++++++++++++++--- .../proton/concurrent/container.go | 4 +- .../qpid.apache.org/proton/concurrent/doc.go | 22 ++++--- .../proton/concurrent/endpoint.go | 3 +- .../proton/concurrent/messaging_test.go | 6 +- .../proton/concurrent/receiver.go | 1 - .../proton/concurrent/session.go | 5 +- .../go/src/qpid.apache.org/proton/doc.go | 50 +++++++++++----- .../go/src/qpid.apache.org/proton/engine.go | 7 +-- 13 files changed, 134 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/examples/go/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/broker.go b/examples/go/broker.go index 3f85e9e..47d0a76 100644 --- a/examples/go/broker.go +++ b/examples/go/broker.go @@ -81,7 +81,7 @@ func (b *broker) listen(addr string) (err error) { if err != nil { return err } - c, err := b.container.NewConnection(conn) + c, err := b.container.Connection(conn) if err != nil { return err } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/examples/go/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/receive.go b/examples/go/receive.go index a45ffe3..86244d7 100644 --- a/examples/go/receive.go +++ b/examples/go/receive.go @@ -73,19 +73,13 @@ func main() { // Open a new connection conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" util.ExitIf(err) - c, err := container.NewConnection(conn) + c, err := container.Connection(conn) util.ExitIf(err) util.ExitIf(c.Open()) connections <- c // Save connection so we can Close() when main() ends - // Create and open a session - ss, err := c.NewSession() - util.ExitIf(err) - err = ss.Open() - util.ExitIf(err) - // Create a Receiver using the path of the URL as the source address - r, err := ss.Receiver(url.Path) + r, err := c.Receiver(url.Path) util.ExitIf(err) // Loop receiving messages and sending them to the main() goroutine http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/examples/go/send.go ---------------------------------------------------------------------- diff --git a/examples/go/send.go b/examples/go/send.go index 7fa5416..edac2ae 100644 --- a/examples/go/send.go +++ b/examples/go/send.go @@ -76,20 +76,14 @@ func main() { // Open a new connection conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" util.ExitIf(err) - c, err := container.NewConnection(conn) + c, err := container.Connection(conn) util.ExitIf(err) err = c.Open() util.ExitIf(err) connections = append(connections, c) // Save connection so it will be closed when main() ends - // Create and open a session - ss, err := c.NewSession() - util.ExitIf(err) - err = ss.Open() - util.ExitIf(err) - // Create a Sender using the path of the URL as the AMQP address - s, err := ss.Sender(url.Path) + s, err := c.Sender(url.Path) util.ExitIf(err) // Loop sending messages. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/README.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/README.md b/proton-c/bindings/go/src/qpid.apache.org/proton/README.md new file mode 100644 index 0000000..ad57b47 --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/README.md @@ -0,0 +1,12 @@ +# Go binding for proton + +This is a a [Go](http://golang.org) binding for proton. +Package documentation is available at: <http://godoc.org/qpid.apache.org/proton> + +See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/cpp/README.md) +for working examples and practical instructions on how to get started. + +Feedback is encouraged at: + +- Email <[email protected]> +- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go index 9e82760..63fd3fc 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go @@ -33,6 +33,19 @@ import ( type Connection interface { Endpoint + // Sender opens a new sender on the DefaultSession. + // + // v can be a string, which is used as the Target address, or a SenderSettings + // struct containing more details settings. + Sender(v interface{}) (Sender, error) + + // Receiver opens a new Receiver on the DefaultSession(). + // + // v can be a string, which is used as the + // Source address, or a ReceiverSettings struct containing more details + // settings. + Receiver(v interface{}) (Receiver, error) + // Server puts the connection in server mode, must be called before Open(). // // A server connection will do protocol negotiation to accept a incoming AMQP @@ -45,9 +58,12 @@ type Connection interface { // Must be called before Open(). Listen() - // NewSession creates a new local session, you must call Session.Open() - // to open it with the remote peer. - NewSession() (s Session, err error) + // DefaultSession() returns a default session for the connection. It is opened + // on the first call to DefaultSession and returned on subsequent calls. + DefaultSession() (Session, error) + + // Session opens a new session. + Session() (Session, error) // Accept returns the next Endpoint (Session, Sender or Receiver) opened by // the remote peer. It returns (nil, error) if the connection closes. @@ -70,13 +86,13 @@ type Connection interface { // Container for the connection. Container() Container - // Disconnect the connection abrubtly. + // Disconnect the connection abruptly with an error. Disconnect(error) } type connection struct { endpoint - listenOnce sync.Once + listenOnce, defaultSessionOnce sync.Once // Set before Open() container *container @@ -88,6 +104,8 @@ type connection struct { engine *proton.Engine err internal.FirstError eConnection proton.Connection + + defaultSession Session } func newConnection(conn net.Conn, cont *container) (*connection, error) { @@ -129,18 +147,20 @@ func (c *connection) Disconnect(err error) { } } -// FIXME aconway 2015-09-24: needed? func (c *connection) closed(err error) { // Call from another goroutine to initiate close without deadlock. go c.Close(err) } -func (c *connection) NewSession() (Session, error) { +func (c *connection) Session() (Session, error) { var s Session err := c.engine.InjectWait(func() error { eSession, err := c.engine.Connection().Session() if err == nil { - s = newSession(c, eSession) + eSession.Open() + if err == nil { + s = newSession(c, eSession) + } } return err }) @@ -165,3 +185,29 @@ func (c *connection) Accept() (Endpoint, error) { } func (c *connection) Container() Container { return c.container } + +func (c *connection) DefaultSession() (s Session, err error) { + c.defaultSessionOnce.Do(func() { + c.defaultSession, err = c.Session() + }) + if err == nil { + err = c.Error() + } + return c.defaultSession, err +} + +func (c *connection) Sender(v interface{}) (Sender, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Sender(v) + } else { + return nil, err + } +} + +func (c *connection) Receiver(v interface{}) (Receiver, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Receiver(v) + } else { + return nil, err + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go index 5c090e3..5edecfc 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go @@ -39,7 +39,7 @@ type Container interface { // setting any Connection properties you need to set. Note the net.Conn // can be an outgoing connection (e.g. made with net.Dial) or an incoming // connection (e.g. made with net.Listener.Accept()) - NewConnection(conn net.Conn) (Connection, error) + Connection(conn net.Conn) (Connection, error) } type container struct { @@ -66,6 +66,6 @@ func (cont *container) nextLinkName() string { return cont.id + "@" + cont.linkNames.Next() } -func (cont *container) NewConnection(conn net.Conn) (Connection, error) { +func (cont *container) Connection(conn net.Conn) (Connection, error) { return newConnection(conn, cont) } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go index 3e7756c..810a5da 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go @@ -20,15 +20,23 @@ under the License. /* Package concurrent provides a procedural, concurrent Go API for exchanging AMQP -messages. +messages. You can write clients or servers using this API. -AMPQ defines a credit-based scheme for flow control of messages over a -link. Credit is the number of messages the receiver is willing to accept. The -receiver gives credit to the sender. The sender can send messages without -waiting for a response from the receiver until it runs out of credit, at which -point it must wait for more credit to send more messages. +Start by creating a Container with NewContainer. A Container represents a client +or server application that can contain incoming or outgoing connections. + +You can create connections with the standard Go 'net' package using net.Dial or +net.Listen. Create an AMQP connection over a net.Conn with +Container.Connection() and open it with Connection.Open(). + +AMQP sends messages over "links", each link has a Sender and Receiver +end. Connection.Sender() and Connection.Receiver() allow you to create links to +send and receive messages. + +You can also create an AMQP server connection by calling Connection.Listen() +before calling Open() on the connection. You can then call Connection.Accept() +after calling Connection.Open() to accept incoming sessions and links. -See the documentation of Sender and Receiver for details of how this API uses credit. */ package concurrent http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go index 717cac1..f647058 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go @@ -37,7 +37,8 @@ var Closed = io.EOF // Link, Sender and Receiver for details. // type Endpoint interface { - // Open the endpoint. + // Open the local end of a remotely-initiated endpoint. You must Open() + // endpoints returned by Connection.Accept() before using them. Open() error // Close an endpoint and signal an error to the remote end if error != nil. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go index aa806d7..0ee9f1a 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go @@ -41,7 +41,7 @@ func newServer(cont Container) (net.Addr, <-chan Connection) { ch := make(chan Connection) go func() { conn, err := listener.Accept() - c, err := cont.NewConnection(conn) + c, err := cont.Connection(conn) panicIf(err) c.Server() c.Listen() @@ -55,10 +55,10 @@ func newServer(cont Container) (net.Addr, <-chan Connection) { func newClient(cont Container, addr net.Addr) Session { conn, err := net.Dial(addr.Network(), addr.String()) panicIf(err) - c, err := cont.NewConnection(conn) + c, err := cont.Connection(conn) panicIf(err) c.Open() - sn, err := c.NewSession() + sn, err := c.Session() panicIf(err) panicIf(sn.Open()) return sn http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go index ad033a6..5bcf9f2 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go @@ -190,7 +190,6 @@ func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, er // Called in proton goroutine func (r *receiver) handleDelivery(delivery proton.Delivery) { - // FIXME aconway 2015-09-24: how can this happen if we are remote closed? if r.eLink.State().RemoteClosed() { localClose(r.eLink, r.eLink.RemoteCondition().Error()) return http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go index ba09690..2f609be 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go @@ -28,6 +28,8 @@ import ( // type Session interface { Endpoint + + // Connection owning this session. Connection() Connection // Sender opens a new sender. v can be a string, which is used as the Target @@ -63,8 +65,6 @@ func (s *session) Close(err error) { s.engine().Inject(func() { localClose(s.eSession, err) }) } -// NewSender create a link sending to target. -// You must call snd.Open() before calling snd.Send(). func (s *session) Sender(v interface{}) (snd Sender, err error) { var settings LinkSettings switch v := v.(type) { @@ -86,7 +86,6 @@ func (s *session) Sender(v interface{}) (snd Sender, err error) { return } -// Receiver opens a receiving link. func (s *session) Receiver(v interface{}) (rcv Receiver, err error) { var settings ReceiverSettings switch v := v.(type) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go index b175cf6..25b43af 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go @@ -18,24 +18,46 @@ under the License. */ /* -Package proton provides a Go binding for the Qpid proton AMQP library. -AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> -Proton is an event-driven, concurrent-unsafe AMQP protocol library that allows -you to send and receive messages using the standard AMQP concurrent protocol. +Package proton is a Go binding for the Qpid Proton AMQP messaging toolkit (see +http://qpid.apache.org/proton) It is a concurrent-unsafe, event-driven API that +closely follows the Proton C API. -For most tasks, consider using package `qpid.apache.org/proton/concurrent`. It -provides a concurrent-safe API that is easier and more natural to use in Go. +Package qpid.apache.org/proton/concurrent provides an alternative, +concurrent-safe, procedural API. Most applications will find the concurrent API +easier to use. -The raw proton API is event-driven and not concurrent-safe. You implement a -MessagingHandler event handler to react to AMQP protocol events. You must ensure -that all events are handled in a single goroutine or that you serialize all all -uses of the proton objects associated with a single connection using a lock. -You can use channels to communicate between application goroutines and the -event-handling goroutine, see type Event fro more detail. +If you need direct access to the underlying proton library for some reason, this +package provides it. The types in this package are simple wrappers for C +pointers. They provide access to C functions as Go methods and do some trivial +conversions, for example between Go string and C null-terminated char* strings. -Package `qpid.apache.org/proton/concurrent` does all this for you and presents -a simple concurrent-safe interface. +Consult the C API documentation at http://qpid.apache.org/proton for more +information about the types here. There is a 1-1 correspondence between C type +pn_foo_t and Go type proton.Foo, and between C function + + pn_foo_do_something(pn_foo_t*, ...) + +and Go method + + func (proton.Foo) DoSomething(...) + +The proton.Engine type pumps data between a Go net.Conn connection and a +proton.Connection goroutine that feeds events to a proton.MessagingHandler. See +the proton.Engine documentation for more detail. + +EventHandler and MessagingHandler define an event handling interfaces that you +can implement to react to protocol events. MessagingHandler provides a somewhat +simpler set of events and automates some common tasks for you. + +You must ensure that all events are handled in a single goroutine or that you +serialize all all uses of the proton objects associated with a single connection +using a lock. You can use channels to communicate between application +goroutines and the event-handling goroutine, see Engine documentation for more details. + +Package qpid.apache.org/proton/concurrent does all this for you and presents a +simple concurrent-safe interface, for most applications you should use that +instead. */ package proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go index 3096280..63dc452 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go @@ -76,10 +76,9 @@ func (b *bufferChan) buffer() []byte { // functions (such as sending messages) are encoded and written to the // net.Conn. Create a engine with NewEngine() // -// The proton protocol engine is single threaded (per connection). The Engine runs -// proton in the goroutine that calls Engine.Run() and creates goroutines to feed -// data to/from a net.Conn. You can create multiple Engines to handle multiple -// connections concurrently. +// The Engine runs a proton event loop in the goroutine that calls Engine.Run() +// and creates goroutines to feed data to/from a net.Conn. You can create +// multiple Engines to handle multiple connections concurrently. // // Methods on proton values defined in this package (Sessions, Links etc.) can // only be called in the goroutine that executes the corresponding --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
