Merge branch 'master' into go1 - examples and doc
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/14f7ca56 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/14f7ca56 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/14f7ca56 Branch: refs/heads/go1 Commit: 14f7ca56f863f45393ced61f75861cf1e1a8e5be Parents: 3d8368b 97815c3 Author: Alan Conway <acon...@redhat.com> Authored: Thu Oct 19 13:39:16 2017 +0100 Committer: Alan Conway <acon...@redhat.com> Committed: Thu Oct 19 13:39:16 2017 +0100 ---------------------------------------------------------------------- README.md | 20 ++++++- electron/connection.go | 24 +++++--- electron/container.go | 3 +- electron/ex_client_server_test.go | 81 --------------------------- electron/example_client_server_test.go | 85 +++++++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/README.md ---------------------------------------------------------------------- diff --cc README.md index 7415929,8b34f63..0977787 --- a/README.md +++ b/README.md @@@ -1,102 -1,43 +1,116 @@@ -Qpid Proton - AMQP messaging toolkit -==================================== +# Qpid Go packages for AMQP -Linux Build | Windows Build -------------|-------------- -[](https://travis-ci.org/apache/qpid-proton) | [](https://ci.appveyor.com/project/ke4qqq/qpid-proton/branch/master) +These packages provide [Go](http://golang.org) support for sending and receiving +AMQP messages in client or server applications. Reference documentation is +available at: <http://godoc.org/?q=qpid.apache.org> - They require the [proton-C library](http://qpid.apache.org/proton) to be installed. - On many platforms it is avaialable pre-packaged, for example on Fedora -Qpid Proton is a high-performance, lightweight messaging library. It can be -used in the widest range of messaging applications, including brokers, client -libraries, routers, bridges, proxies, and more. Proton makes it trivial to -integrate with the AMQP 1.0 ecosystem from any platform, environment, or -language ++They require the ++[proton-C library and header files](http://qpid.apache.org/proton) to be ++installed. On many platforms it is avaialable pre-packaged, for example on ++Fedora - yum install qpid-proton-c-devel -Features --------- ++ dnf install qpid-proton-c-devel + - - A flexible and capable reactive messaging API - - Full control of AMQP 1.0 protocol semantics - - Portable C implementation with bindings to popular languages - - Peer-to-peer and brokered messaging - - Secure communication via SSL and SASL ++If you built proton from source, you can set environment variables to find the ++built libraries and headers as follows: + -Universal - Proton is designed to scale both up and down. Equally suitable for -simple clients or high-powered servers, it can be deployed in simple -peer-to-peer configurations or as part of a global federated messaging network. ++ source <build-directory>/config.sh + -Embeddable - Proton is carefully written to be portable and cross platform. It -has minimal dependencies, and it is architected to be usable with any threading -model, as well as with non-threaded applications. These features make it -uniquely suited for embedding messaging capabilities into existing software. ++If you have installed the library and headers in non-standard directories, then ++add them to the following environment variables: + -Standard - Built around the AMQP 1.0 messaging standard, Proton is not only -ideal for building out your own messaging applications but also for connecting -them to the broader ecosystem of AMQP 1.0-based messaging applications. ++ LD_LIBRARY_PATH # directory containing the library ++ LIBRARY_PATH # directory containing the library ++ C_INCLUDE_PATH # directory containing the proton/ subdirectory with header files -Getting Started ---------------- +There are 3 packages: -See the included INSTALL.md file for build and install instructions and the -DEVELOPERS file for information on how to modify and test the library code -itself. +[qpid.apache.org/amqp](http://godoc.org/qpid.apache.org/amqp) provides functions +to convert AMQP messages and data types to and from Go data types. Used by both +the proton and electron packages to manage AMQP data. -Please see http://qpid.apache.org/proton for a more info. +[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a +simple, concurrent-safe API for sending and receiving messages. It can be used +with goroutines and channels to build concurrent AMQP clients and servers. + +[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an +event-driven, concurrent-unsafe package that closely follows the proton C +API. Most Go programmers will find the +[electron](http://godoc.org/qpid.apache.org/electron) package easier to use. + +See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md) +to help you get started. + +Feedback is encouraged at: + +- Email <pro...@qpid.apache.org> +- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue. + +### Why two APIs? + +The `proton` API is a direct mapping of the proton C library into Go. It is +usable but not very natural for a Go programmer because it takes an +*event-driven* approach and has no built-in support for concurrent +use. `electron` uses `proton` internally but provides a more Go-like API that is +safe to use from multiple concurrent goroutines. + +Go encourages programs to be structured as concurrent *goroutines* that +communicate via *channels*. Go literature distinguishes between: + +- *concurrency*: "keeping track of things that could be done in parallel" +- *parallelism*: "actually doing things in parallel on multiple CPUs or cores" + +A Go program expresses concurrency by starting goroutines for potentially +concurrent tasks. The Go runtime schedules the activity of goroutines onto a +small number (possibly one) of actual parallel executions. + +Even with no hardware parallelism, goroutine concurrency lets the Go runtime +order unpredictable events like file descriptors being readable/writable, +channels having data, timers firing etc. Go automatically takes care of +switching out goroutines that block or sleep so it is normal to write code in +terms of blocking calls. + +By contrast, event-driven programming is based on polling mechanisms like +`select`, `poll` or `epoll`. These also dispatch unpredictably ordered events to +a single thread or a small thread pool. However this requires a different style +of programming: "event-driven" or "reactive" programming. Go developers call it +"inside-out" programming. In an event-driven program blocking is a big problem +as it consumes a scarce thread of execution, so actions that take time to +complete have to be re-structured in terms of multiple events. + +The promise of Go is that you can express your program in concurrent, sequential +terms and the Go runtime will turn it inside-out for you. You can start +goroutines for all concurrent activities. They can loop forever or block for as +long as they need waiting for timers, IO or any unpredictable event. Go will +interleave and schedule them efficiently onto the available parallel hardware. + +For example: in the `electron` API, you can send a message and wait for it to be +acknowledged in a single function. All the information about the message, why +you sent it, and what to do when it is acknowledged can be held in local +variables, all the code is in a simple sequence. Other goroutines in your +program can be sending and receiving messages concurrently, they are not +blocked. + +In the `proton` API, an event handler that sends a message must return +*immediately*, it cannot block the event loop to wait for +acknowledgement. Acknowledgement is a separate event, so the code for handling +it is in a different event handler. Context information about the message has to +be stored in some non-local variable that both functions can find. This makes +the code harder to follow. + +The `proton` API is important because it is the foundation for the `electron` +API, and may be useful for programs that need to be close to the original C +library for some reason. However the `electron` API hides the event-driven +details behind simple, sequential, concurrent-safe methods that can be called +from arbitrary goroutines. Under the covers, data is passed through channels to +dedicated `proton` goroutines so user goroutines can work concurrently with the +proton event-loop. + +## New to Go? + +If you are new to Go then these are a good place to start: + +- [A Tour of Go](http://tour.golang.org) +- [Effective Go](http://golang.org/doc/effective_go.html) + +Then look at the tools and docs at <http://golang.org> as you need them. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/connection.go ---------------------------------------------------------------------- diff --cc electron/connection.go index 8f62491,0000000..267ee1e mode 100644,000000..100644 --- a/electron/connection.go +++ b/electron/connection.go @@@ -1,405 -1,0 +1,413 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +// #include <proton/disposition.h> +import "C" + +import ( + "net" + "qpid.apache.org/proton" + "sync" + "time" +) + +// Settings associated with a Connection. +type ConnectionSettings interface { + // Authenticated user name associated with the connection. + User() string + + // The AMQP virtual host name for the connection. + // + // Optional, useful when the server has multiple names and provides different + // service based on the name the client uses to connect. + // + // By default it is set to the DNS host name that the client uses to connect, + // but it can be set to something different at the client side with the + // VirtualHost() option. + // + // Returns error if the connection fails to authenticate. + VirtualHost() string + + // Heartbeat is the maximum delay between sending frames that the remote peer + // has requested of us. If the interval expires an empty "heartbeat" frame + // will be sent automatically to keep the connection open. + Heartbeat() time.Duration +} + +// Connection is an AMQP connection, created by a Container. +type Connection interface { + Endpoint + ConnectionSettings + + // Sender opens a new sender on the DefaultSession. + Sender(...LinkOption) (Sender, error) + + // Receiver opens a new Receiver on the DefaultSession(). + Receiver(...LinkOption) (Receiver, error) + + // DefaultSession() returns a default session for the connection. It is opened + // on the first call to DefaultSession and returned on subsequent calls. + DefaultSession() (Session, error) + + // Session opens a new session. + Session(...SessionOption) (Session, error) + + // Container for the connection. + Container() Container + + // Disconnect the connection abruptly with an error. + Disconnect(error) + + // Wait waits for the connection to be disconnected. + Wait() error + + // WaitTimeout is like Wait but returns Timeout if the timeout expires. + WaitTimeout(time.Duration) error + + // Incoming returns a channel for incoming endpoints opened by the remote peer. - // See the Incoming interface for more. ++ // See the Incoming interface for more detail. + // - // Not receiving from Incoming() and calling Accept/Reject will block the - // electron event loop. You should run a loop to handle the types that - // interest you in a switch{} and and Accept() all others. ++ // Note: this channel will first return an *IncomingConnection for the ++ // connection itself which allows you to look at security information and ++ // decide whether to Accept() or Reject() the connection. Then it will return ++ // *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened ++ // by the remote end. ++ // ++ // Note 2: you must receiving from Incoming() and call Accept/Reject to avoid ++ // blocking electron event loop. Normally you would run a loop in a goroutine ++ // to handle incoming types that interest and Accept() those that don't. + Incoming() <-chan Incoming +} + +type connectionSettings struct { + user, virtualHost string + heartbeat time.Duration +} + +func (c connectionSettings) User() string { return c.user } +func (c connectionSettings) VirtualHost() string { return c.virtualHost } +func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat } + +// ConnectionOption can be passed when creating a connection to configure various options +type ConnectionOption func(*connection) + +// User returns a ConnectionOption sets the user name for a connection +func User(user string) ConnectionOption { + return func(c *connection) { + c.user = user + c.pConnection.SetUser(user) + } +} + +// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection. +// Only applies to outbound client connection. +func VirtualHost(virtualHost string) ConnectionOption { + return func(c *connection) { + c.virtualHost = virtualHost + c.pConnection.SetHostname(virtualHost) + } +} + +// Password returns a ConnectionOption to set the password used to establish a +// connection. Only applies to outbound client connection. +// +// The connection will erase its copy of the password from memory as soon as it +// has been used to authenticate. If you are concerned about paswords staying in +// memory you should never store them as strings, and should overwrite your +// copy as soon as you are done with it. +// +func Password(password []byte) ConnectionOption { + return func(c *connection) { c.pConnection.SetPassword(password) } +} + +// Server returns a ConnectionOption to put the connection in server mode for incoming connections. +// +// A server connection will do protocol negotiation to accept a incoming AMQP +// connection. Normally you would call this for a connection created by +// net.Listener.Accept() +// +func Server() ConnectionOption { + return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) } +} + +// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see +// Connection.Incoming() This is automatically set for Server() connections. +func AllowIncoming() ConnectionOption { + return func(c *connection) { c.incoming = make(chan Incoming) } +} + +// Parent returns a ConnectionOption that associates the Connection with it's Container +// If not set a connection will create its own default container. +func Parent(cont Container) ConnectionOption { + return func(c *connection) { c.container = cont.(*container) } +} + +type connection struct { + endpoint + connectionSettings + + defaultSessionOnce, closeOnce sync.Once + + container *container + conn net.Conn + server bool + incoming chan Incoming + handler *handler + engine *proton.Engine + pConnection proton.Connection + + defaultSession Session +} + +// NewConnection creates a connection with the given options. +func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) { + c := &connection{ + conn: conn, + } + c.handler = newHandler(c) + var err error + c.engine, err = proton.NewEngine(c.conn, c.handler.delegator) + if err != nil { + return nil, err + } + c.pConnection = c.engine.Connection() + for _, set := range opts { + set(c) + } + if c.container == nil { + c.container = NewContainer("").(*container) + } + c.pConnection.SetContainer(c.container.Id()) + globalSASLInit(c.engine) + + c.endpoint.init(c.engine.String()) + go c.run() + return c, nil +} + +func (c *connection) run() { + if !c.server { + c.pConnection.Open() + } + _ = c.engine.Run() + if c.incoming != nil { + close(c.incoming) + } + _ = c.closed(Closed) +} + +func (c *connection) Close(err error) { + c.err.Set(err) + c.engine.Close(err) +} + +func (c *connection) Disconnect(err error) { + c.err.Set(err) + c.engine.Disconnect(err) +} + +func (c *connection) Session(opts ...SessionOption) (Session, error) { + var s Session + err := c.engine.InjectWait(func() error { + if c.Error() != nil { + return c.Error() + } + pSession, err := c.engine.Connection().Session() + if err == nil { + pSession.Open() + if err == nil { + s = newSession(c, pSession, opts...) + } + } + return err + }) + return s, err +} + +func (c *connection) Container() Container { return c.container } + +func (c *connection) DefaultSession() (s Session, err error) { + c.defaultSessionOnce.Do(func() { + c.defaultSession, err = c.Session() + }) + if err == nil { + err = c.Error() + } + return c.defaultSession, err +} + +func (c *connection) Sender(opts ...LinkOption) (Sender, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Sender(opts...) + } else { + return nil, err + } +} + +func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) { + if s, err := c.DefaultSession(); err == nil { + return s.Receiver(opts...) + } else { + return nil, err + } +} + +func (c *connection) Connection() Connection { return c } + +func (c *connection) Wait() error { return c.WaitTimeout(Forever) } +func (c *connection) WaitTimeout(timeout time.Duration) error { + _, err := timedReceive(c.done, timeout) + if err == Timeout { + return Timeout + } + return c.Error() +} + +func (c *connection) Incoming() <-chan Incoming { + assert(c.incoming != nil, "Incoming() is only allowed for a Connection created with the Server() option: %s", c) + return c.incoming +} + +type IncomingConnection struct { + incoming + connectionSettings + c *connection +} + +func newIncomingConnection(c *connection) *IncomingConnection { + c.user = c.pConnection.Transport().User() + c.virtualHost = c.pConnection.RemoteHostname() + return &IncomingConnection{ + incoming: makeIncoming(c.pConnection), + connectionSettings: c.connectionSettings, + c: c} +} + +// AcceptConnection is like Accept() but takes ConnectionOption s +// For example you can set the Heartbeat() for the accepted connection. +func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection { + return in.accept(func() Endpoint { + for _, opt := range opts { + opt(in.c) + } + in.c.pConnection.Open() + return in.c + }).(Connection) +} + +func (in *IncomingConnection) Accept() Endpoint { + return in.AcceptConnection() +} + +func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() } + +// SASLEnable returns a ConnectionOption that enables SASL authentication. +// Only required if you don't set any other SASL options. +func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } } + +// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL +// mechanisms. +// +// Can be used on the client or the server to restrict the SASL for a connection. +// mechs is a space-separated list of mechanism names. +// +func SASLAllowedMechs(mechs string) ConnectionOption { + return func(c *connection) { sasl(c).AllowedMechs(mechs) } +} + +// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear +// text SASL authentication mechanisms +// +// By default the SASL layer is configured not to allow mechanisms that disclose +// the clear text of the password over an unencrypted AMQP connection. This specifically +// will disallow the use of the PLAIN mechanism without using SSL encryption. +// +// This default is to avoid disclosing password information accidentally over an +// insecure network. +// +func SASLAllowInsecure(b bool) ConnectionOption { + return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) } +} + +// Heartbeat returns a ConnectionOption that requests the maximum delay +// between sending frames for the remote peer. If we don't receive any frames +// within 2*delay we will close the connection. +// +func Heartbeat(delay time.Duration) ConnectionOption { + // Proton-C divides the idle-timeout by 2 before sending, so compensate. + return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) } +} + +// GlobalSASLConfigDir sets the SASL configuration directory for every +// Connection created in this process. If not called, the default is determined +// by your SASL installation. +// +// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections. +// +func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir } + +// GlobalSASLConfigName sets the SASL configuration name for every Connection +// created in this process. If not called the default is "proton-server". +// +// The complete configuration file name is +// <sasl-config-dir>/<sasl-config-name>.conf +// +// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections. +// +func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir } + +var ( + globalSASLConfigName string + globalSASLConfigDir string +) + +// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we +// can realistically offer is global configuration. Later if/when the pn_sasl C +// impl is fixed we can offer per connection over-rides. +func globalSASLInit(eng *proton.Engine) { + sasl := eng.Transport().SASL() + if globalSASLConfigName != "" { + sasl.ConfigName(globalSASLConfigName) + } + if globalSASLConfigDir != "" { + sasl.ConfigPath(globalSASLConfigDir) + } +} + +// Dial is shorthand for using net.Dial() then NewConnection() - func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) { - conn, err := net.Dial(network, addr) ++// See net.Dial() for the meaning of the network, address arguments. ++func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) { ++ conn, err := net.Dial(network, address) + if err == nil { + c, err = NewConnection(conn, opts...) + } + return +} + +// DialWithDialer is shorthand for using dialer.Dial() then NewConnection() - func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) { - conn, err := dialer.Dial(network, addr) ++// See net.Dial() for the meaning of the network, address arguments. ++func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error) { ++ conn, err := dialer.Dial(network, address) + if err == nil { + c, err = NewConnection(conn, opts...) + } + return +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/container.go ---------------------------------------------------------------------- diff --cc electron/container.go index efb24ff,0000000..7c19aa5 mode 100644,000000..100644 --- a/electron/container.go +++ b/electron/container.go @@@ -1,104 -1,0 +1,105 @@@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package electron + +import ( + "net" + "qpid.apache.org/proton" + "strconv" + "sync/atomic" +) + +// Container is an AMQP container, it represents a single AMQP "application" +// which can have multiple client or server connections. +// +// Each Container in a distributed AMQP application must have a unique +// container-id which is applied to its connections. +// +// Create with NewContainer() +// +type Container interface { + // Id is a unique identifier for the container in your distributed application. + Id() string + + // Connection creates a connection associated with this container. + Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) + + // Dial is shorthand for + // conn, err := net.Dial(); c, err := Connection(conn, opts...) - Dial(network string, addr string, opts ...ConnectionOption) (Connection, error) ++ // See net.Dial() for the meaning of the network, address arguments. ++ Dial(network string, address string, opts ...ConnectionOption) (Connection, error) + + // Accept is shorthand for: + // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...) + Accept(l net.Listener, opts ...ConnectionOption) (Connection, error) + + // String returns Id() + String() string +} + +type container struct { + id string + tagCounter uint64 +} + +func (cont *container) nextTag() string { + return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32) +} + +// NewContainer creates a new container. The id must be unique in your +// distributed application, all connections created by the container +// will have this container-id. +// +// If id == "" a random UUID will be generated for the id. +func NewContainer(id string) Container { + if id == "" { + id = proton.UUID4().String() + } + cont := &container{id: id} + return cont +} + +func (cont *container) Id() string { return cont.id } + +func (cont *container) String() string { return cont.Id() } + +func (cont *container) nextLinkName() string { + return cont.id + "@" + cont.nextTag() +} + +func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) { + return NewConnection(conn, append(opts, Parent(cont))...) +} + +func (cont *container) Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) { + conn, err := net.Dial(network, address) + if err == nil { + c, err = cont.Connection(conn, opts...) + } + return +} + +func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) { + conn, err := l.Accept() + if err == nil { + c, err = cont.Connection(conn, append(opts, Server())...) + } + return +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/example_client_server_test.go ---------------------------------------------------------------------- diff --cc electron/example_client_server_test.go index 0000000,0000000..3aa5892 new file mode 100644 --- /dev/null +++ b/electron/example_client_server_test.go @@@ -1,0 -1,0 +1,85 @@@ ++package electron_test ++ ++import ( ++ "fmt" ++ "net" ++ "qpid.apache.org/amqp" ++ "qpid.apache.org/electron" ++ "sync" ++) ++ ++// Example Server that accepts a single Connection, Session and Receiver link ++// and prints messages received until the link closes. ++func Server(l net.Listener) { ++ cont := electron.NewContainer("server") ++ c, _ := cont.Accept(l) // Ignoring error handling ++ l.Close() // This server only accepts one connection ++ // Process incoming endpoints till we get a Receiver link ++ var r electron.Receiver ++ for r == nil { ++ in := <-c.Incoming() ++ switch in := in.(type) { ++ case *electron.IncomingSession, *electron.IncomingConnection: ++ in.Accept() // Accept the incoming connection and session for the receiver ++ case *electron.IncomingReceiver: ++ in.SetCapacity(10) ++ in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages. ++ r = in.Accept().(electron.Receiver) ++ case nil: ++ return // Connection is closed ++ default: ++ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in)) ++ } ++ } ++ go func() { // Reject any further incoming endpoints ++ for in := range c.Incoming() { ++ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in)) ++ } ++ }() ++ // Receive messages till the Receiver closes ++ rm, err := r.Receive() ++ for ; err == nil; rm, err = r.Receive() { ++ fmt.Printf("server received: %q\n", rm.Message.Body()) ++ rm.Accept() // Signal to the client that the message was accepted ++ } ++ fmt.Printf("server receiver closed: %v\n", err) ++} ++ ++// Example client sending messages to a server running in a goroutine. ++// ++// Normally client and server would be separate processes. For more realistic and detailed examples: ++// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md ++// ++func Example_clientServer() { ++ // NOTE: We ignoring error handling in this example ++ l, _ := net.Listen("tcp", "") // Open a listening port for server, client connect to this port ++ ++ // SERVER: start the server running in a separate goroutine ++ var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting ++ waitServer.Add(1) ++ go func() { // Run the server in the background ++ defer waitServer.Done() ++ Server(l) ++ }() ++ ++ // CLIENT: Send messages to the server ++ addr := l.Addr() ++ c, _ := electron.Dial(addr.Network(), addr.String()) ++ s, _ := c.Sender() ++ for i := 0; i < 3; i++ { ++ msg := fmt.Sprintf("hello %v", i) ++ // Send and wait for the Outcome from the server. ++ // Note: For higher throughput, use SendAsync() to send a stream of messages ++ // and process the returning stream of Outcomes concurrently. ++ s.SendSync(amqp.NewMessageWith(msg)) ++ } ++ c.Close(nil) // Closing the connection will stop the server ++ ++ waitServer.Wait() // Let the server finish ++ ++ // Output: ++ // server received: "hello 0" ++ // server received: "hello 1" ++ // server received: "hello 2" ++ // server receiver closed: EOF ++} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org