Merge branch 'master' into go1 - examples and doc

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/14f7ca56
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/14f7ca56
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/14f7ca56

Branch: refs/heads/go1
Commit: 14f7ca56f863f45393ced61f75861cf1e1a8e5be
Parents: 3d8368b 97815c3
Author: Alan Conway <acon...@redhat.com>
Authored: Thu Oct 19 13:39:16 2017 +0100
Committer: Alan Conway <acon...@redhat.com>
Committed: Thu Oct 19 13:39:16 2017 +0100

----------------------------------------------------------------------
 README.md                              | 20 ++++++-
 electron/connection.go                 | 24 +++++---
 electron/container.go                  |  3 +-
 electron/ex_client_server_test.go      | 81 ---------------------------
 electron/example_client_server_test.go | 85 +++++++++++++++++++++++++++++
 5 files changed, 120 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/README.md
----------------------------------------------------------------------
diff --cc README.md
index 7415929,8b34f63..0977787
--- a/README.md
+++ b/README.md
@@@ -1,102 -1,43 +1,116 @@@
 -Qpid Proton - AMQP messaging toolkit
 -====================================
 +# Qpid Go packages for AMQP
  
 -Linux Build | Windows Build
 -------------|--------------
 -[![Linux Build 
Status](https://travis-ci.org/apache/qpid-proton.svg?branch=master)](https://travis-ci.org/apache/qpid-proton)
 | [![Windows Build 
Status](https://ci.appveyor.com/api/projects/status/github/apache/qpid-proton?branch=master&svg=true)](https://ci.appveyor.com/project/ke4qqq/qpid-proton/branch/master)
 +These packages provide [Go](http://golang.org) support for sending and 
receiving
 +AMQP messages in client or server applications. Reference documentation is
 +available at: <http://godoc.org/?q=qpid.apache.org>
  
- They require the [proton-C library](http://qpid.apache.org/proton) to be 
installed.
- On many platforms it is avaialable pre-packaged, for example on Fedora
 -Qpid Proton is a high-performance, lightweight messaging library. It can be
 -used in the widest range of messaging applications, including brokers, client
 -libraries, routers, bridges, proxies, and more. Proton makes it trivial to
 -integrate with the AMQP 1.0 ecosystem from any platform, environment, or
 -language
++They require the
++[proton-C library and header files](http://qpid.apache.org/proton) to be
++installed.  On many platforms it is avaialable pre-packaged, for example on
++Fedora
  
-     yum install qpid-proton-c-devel
 -Features
 ---------
++    dnf install qpid-proton-c-devel
+ 
 -  - A flexible and capable reactive messaging API
 -  - Full control of AMQP 1.0 protocol semantics
 -  - Portable C implementation with bindings to popular languages
 -  - Peer-to-peer and brokered messaging
 -  - Secure communication via SSL and SASL
++If you built proton from source, you can set environment variables to find the
++built libraries and headers as follows:
+ 
 -Universal - Proton is designed to scale both up and down. Equally suitable for
 -simple clients or high-powered servers, it can be deployed in simple
 -peer-to-peer configurations or as part of a global federated messaging 
network.
++    source <build-directory>/config.sh
+ 
 -Embeddable - Proton is carefully written to be portable and cross platform. It
 -has minimal dependencies, and it is architected to be usable with any 
threading
 -model, as well as with non-threaded applications. These features make it
 -uniquely suited for embedding messaging capabilities into existing software.
++If you have installed the library and headers in non-standard directories, 
then
++add them to the following environment variables:
+ 
 -Standard - Built around the AMQP 1.0 messaging standard, Proton is not only
 -ideal for building out your own messaging applications but also for connecting
 -them to the broader ecosystem of AMQP 1.0-based messaging applications.
++    LD_LIBRARY_PATH  # directory containing the library
++    LIBRARY_PATH     # directory containing the library
++    C_INCLUDE_PATH   # directory containing the proton/ subdirectory with 
header files
  
 -Getting Started
 ----------------
 +There are 3 packages:
  
 -See the included INSTALL.md file for build and install instructions and the
 -DEVELOPERS file for information on how to modify and test the library code
 -itself.
 +[qpid.apache.org/amqp](http://godoc.org/qpid.apache.org/amqp) provides 
functions
 +to convert AMQP messages and data types to and from Go data types.  Used by 
both
 +the proton and electron packages to manage AMQP data.
  
 -Please see http://qpid.apache.org/proton for a more info.
 +[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a
 +simple, concurrent-safe API for sending and receiving messages. It can be used
 +with goroutines and channels to build concurrent AMQP clients and servers.
 +
 +[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an
 +event-driven, concurrent-unsafe package that closely follows the proton C
 +API. Most Go programmers will find the
 +[electron](http://godoc.org/qpid.apache.org/electron) package easier to use.
 +
 +See the 
[examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
 +to help you get started.
 +
 +Feedback is encouraged at:
 +
 +- Email <pro...@qpid.apache.org>
 +- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach 
patches to an issue.
 +
 +### Why two APIs?
 +
 +The `proton` API is a direct mapping of the proton C library into Go. It is
 +usable but not very natural for a Go programmer because it takes an
 +*event-driven* approach and has no built-in support for concurrent
 +use. `electron` uses `proton` internally but provides a more Go-like API that 
is
 +safe to use from multiple concurrent goroutines.
 +
 +Go encourages programs to be structured as concurrent *goroutines* that
 +communicate via *channels*. Go literature distinguishes between:
 +
 +- *concurrency*: "keeping track of things that could be done in parallel"
 +- *parallelism*: "actually doing things in parallel on multiple CPUs or cores"
 +
 +A Go program expresses concurrency by starting goroutines for potentially
 +concurrent tasks. The Go runtime schedules the activity of goroutines onto a
 +small number (possibly one) of actual parallel executions.
 +
 +Even with no hardware parallelism, goroutine concurrency lets the Go runtime
 +order unpredictable events like file descriptors being readable/writable,
 +channels having data, timers firing etc. Go automatically takes care of
 +switching out goroutines that block or sleep so it is normal to write code in
 +terms of blocking calls.
 +
 +By contrast, event-driven programming is based on polling mechanisms like
 +`select`, `poll` or `epoll`. These also dispatch unpredictably ordered events 
to
 +a single thread or a small thread pool. However this requires a different 
style
 +of programming: "event-driven" or "reactive" programming. Go developers call 
it
 +"inside-out" programming.  In an event-driven program blocking is a big 
problem
 +as it consumes a scarce thread of execution, so actions that take time to
 +complete have to be re-structured in terms of multiple events.
 +
 +The promise of Go is that you can express your program in concurrent, 
sequential
 +terms and the Go runtime will turn it inside-out for you. You can start
 +goroutines for all concurrent activities. They can loop forever or block for 
as
 +long as they need waiting for timers, IO or any unpredictable event. Go will
 +interleave and schedule them efficiently onto the available parallel hardware.
 +
 +For example: in the `electron` API, you can send a message and wait for it to 
be
 +acknowledged in a single function. All the information about the message, why
 +you sent it, and what to do when it is acknowledged can be held in local
 +variables, all the code is in a simple sequence. Other goroutines in your
 +program can be sending and receiving messages concurrently, they are not
 +blocked.
 +
 +In the `proton` API, an event handler that sends a message must return
 +*immediately*, it cannot block the event loop to wait for
 +acknowledgement. Acknowledgement is a separate event, so the code for handling
 +it is in a different event handler. Context information about the message has 
to
 +be stored in some non-local variable that both functions can find. This makes
 +the code harder to follow.
 +
 +The `proton` API is important because it is the foundation for the `electron`
 +API, and may be useful for programs that need to be close to the original C
 +library for some reason. However the `electron` API hides the event-driven
 +details behind simple, sequential, concurrent-safe methods that can be called
 +from arbitrary goroutines. Under the covers, data is passed through channels 
to
 +dedicated `proton` goroutines so user goroutines can work concurrently with 
the
 +proton event-loop.
 +
 +## New to Go?
 +
 +If you are new to Go then these are a good place to start:
 +
 +- [A Tour of Go](http://tour.golang.org)
 +- [Effective Go](http://golang.org/doc/effective_go.html)
 +
 +Then look at the tools and docs at <http://golang.org> as you need them.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index 8f62491,0000000..267ee1e
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,405 -1,0 +1,413 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
 +      "net"
 +      "qpid.apache.org/proton"
 +      "sync"
 +      "time"
 +)
 +
 +// Settings associated with a Connection.
 +type ConnectionSettings interface {
 +      // Authenticated user name associated with the connection.
 +      User() string
 +
 +      // The AMQP virtual host name for the connection.
 +      //
 +      // Optional, useful when the server has multiple names and provides 
different
 +      // service based on the name the client uses to connect.
 +      //
 +      // By default it is set to the DNS host name that the client uses to 
connect,
 +      // but it can be set to something different at the client side with the
 +      // VirtualHost() option.
 +      //
 +      // Returns error if the connection fails to authenticate.
 +      VirtualHost() string
 +
 +      // Heartbeat is the maximum delay between sending frames that the 
remote peer
 +      // has requested of us. If the interval expires an empty "heartbeat" 
frame
 +      // will be sent automatically to keep the connection open.
 +      Heartbeat() time.Duration
 +}
 +
 +// Connection is an AMQP connection, created by a Container.
 +type Connection interface {
 +      Endpoint
 +      ConnectionSettings
 +
 +      // Sender opens a new sender on the DefaultSession.
 +      Sender(...LinkOption) (Sender, error)
 +
 +      // Receiver opens a new Receiver on the DefaultSession().
 +      Receiver(...LinkOption) (Receiver, error)
 +
 +      // DefaultSession() returns a default session for the connection. It is 
opened
 +      // on the first call to DefaultSession and returned on subsequent calls.
 +      DefaultSession() (Session, error)
 +
 +      // Session opens a new session.
 +      Session(...SessionOption) (Session, error)
 +
 +      // Container for the connection.
 +      Container() Container
 +
 +      // Disconnect the connection abruptly with an error.
 +      Disconnect(error)
 +
 +      // Wait waits for the connection to be disconnected.
 +      Wait() error
 +
 +      // WaitTimeout is like Wait but returns Timeout if the timeout expires.
 +      WaitTimeout(time.Duration) error
 +
 +      // Incoming returns a channel for incoming endpoints opened by the 
remote peer.
-       // See the Incoming interface for more.
++      // See the Incoming interface for more detail.
 +      //
-       // Not receiving from Incoming() and calling Accept/Reject will block 
the
-       // electron event loop. You should run a loop to handle the types that
-       // interest you in a switch{} and and Accept() all others.
++      // Note: this channel will first return an *IncomingConnection for the
++      // connection itself which allows you to look at security information 
and
++      // decide whether to Accept() or Reject() the connection. Then it will 
return
++      // *IncomingSession, *IncomingSender and *IncomingReceiver as they are 
opened
++      // by the remote end.
++      //
++      // Note 2: you must receiving from Incoming() and call Accept/Reject to 
avoid
++      // blocking electron event loop. Normally you would run a loop in a 
goroutine
++      // to handle incoming types that interest and Accept() those that don't.
 +      Incoming() <-chan Incoming
 +}
 +
 +type connectionSettings struct {
 +      user, virtualHost string
 +      heartbeat         time.Duration
 +}
 +
 +func (c connectionSettings) User() string             { return c.user }
 +func (c connectionSettings) VirtualHost() string      { return c.virtualHost }
 +func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
 +
 +// ConnectionOption can be passed when creating a connection to configure 
various options
 +type ConnectionOption func(*connection)
 +
 +// User returns a ConnectionOption sets the user name for a connection
 +func User(user string) ConnectionOption {
 +      return func(c *connection) {
 +              c.user = user
 +              c.pConnection.SetUser(user)
 +      }
 +}
 +
 +// VirtualHost returns a ConnectionOption to set the AMQP virtual host for 
the connection.
 +// Only applies to outbound client connection.
 +func VirtualHost(virtualHost string) ConnectionOption {
 +      return func(c *connection) {
 +              c.virtualHost = virtualHost
 +              c.pConnection.SetHostname(virtualHost)
 +      }
 +}
 +
 +// Password returns a ConnectionOption to set the password used to establish a
 +// connection.  Only applies to outbound client connection.
 +//
 +// The connection will erase its copy of the password from memory as soon as 
it
 +// has been used to authenticate. If you are concerned about paswords staying 
in
 +// memory you should never store them as strings, and should overwrite your
 +// copy as soon as you are done with it.
 +//
 +func Password(password []byte) ConnectionOption {
 +      return func(c *connection) { c.pConnection.SetPassword(password) }
 +}
 +
 +// Server returns a ConnectionOption to put the connection in server mode for 
incoming connections.
 +//
 +// A server connection will do protocol negotiation to accept a incoming AMQP
 +// connection. Normally you would call this for a connection created by
 +// net.Listener.Accept()
 +//
 +func Server() ConnectionOption {
 +      return func(c *connection) { c.engine.Server(); c.server = true; 
AllowIncoming()(c) }
 +}
 +
 +// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
 +// Connection.Incoming() This is automatically set for Server() connections.
 +func AllowIncoming() ConnectionOption {
 +      return func(c *connection) { c.incoming = make(chan Incoming) }
 +}
 +
 +// Parent returns a ConnectionOption that associates the Connection with it's 
Container
 +// If not set a connection will create its own default container.
 +func Parent(cont Container) ConnectionOption {
 +      return func(c *connection) { c.container = cont.(*container) }
 +}
 +
 +type connection struct {
 +      endpoint
 +      connectionSettings
 +
 +      defaultSessionOnce, closeOnce sync.Once
 +
 +      container   *container
 +      conn        net.Conn
 +      server      bool
 +      incoming    chan Incoming
 +      handler     *handler
 +      engine      *proton.Engine
 +      pConnection proton.Connection
 +
 +      defaultSession Session
 +}
 +
 +// NewConnection creates a connection with the given options.
 +func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, 
error) {
 +      c := &connection{
 +              conn: conn,
 +      }
 +      c.handler = newHandler(c)
 +      var err error
 +      c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
 +      if err != nil {
 +              return nil, err
 +      }
 +      c.pConnection = c.engine.Connection()
 +      for _, set := range opts {
 +              set(c)
 +      }
 +      if c.container == nil {
 +              c.container = NewContainer("").(*container)
 +      }
 +      c.pConnection.SetContainer(c.container.Id())
 +      globalSASLInit(c.engine)
 +
 +      c.endpoint.init(c.engine.String())
 +      go c.run()
 +      return c, nil
 +}
 +
 +func (c *connection) run() {
 +      if !c.server {
 +              c.pConnection.Open()
 +      }
 +      _ = c.engine.Run()
 +      if c.incoming != nil {
 +              close(c.incoming)
 +      }
 +      _ = c.closed(Closed)
 +}
 +
 +func (c *connection) Close(err error) {
 +      c.err.Set(err)
 +      c.engine.Close(err)
 +}
 +
 +func (c *connection) Disconnect(err error) {
 +      c.err.Set(err)
 +      c.engine.Disconnect(err)
 +}
 +
 +func (c *connection) Session(opts ...SessionOption) (Session, error) {
 +      var s Session
 +      err := c.engine.InjectWait(func() error {
 +              if c.Error() != nil {
 +                      return c.Error()
 +              }
 +              pSession, err := c.engine.Connection().Session()
 +              if err == nil {
 +                      pSession.Open()
 +                      if err == nil {
 +                              s = newSession(c, pSession, opts...)
 +                      }
 +              }
 +              return err
 +      })
 +      return s, err
 +}
 +
 +func (c *connection) Container() Container { return c.container }
 +
 +func (c *connection) DefaultSession() (s Session, err error) {
 +      c.defaultSessionOnce.Do(func() {
 +              c.defaultSession, err = c.Session()
 +      })
 +      if err == nil {
 +              err = c.Error()
 +      }
 +      return c.defaultSession, err
 +}
 +
 +func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Sender(opts...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
 +func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Receiver(opts...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
 +func (c *connection) Connection() Connection { return c }
 +
 +func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
 +func (c *connection) WaitTimeout(timeout time.Duration) error {
 +      _, err := timedReceive(c.done, timeout)
 +      if err == Timeout {
 +              return Timeout
 +      }
 +      return c.Error()
 +}
 +
 +func (c *connection) Incoming() <-chan Incoming {
 +      assert(c.incoming != nil, "Incoming() is only allowed for a Connection 
created with the Server() option: %s", c)
 +      return c.incoming
 +}
 +
 +type IncomingConnection struct {
 +      incoming
 +      connectionSettings
 +      c *connection
 +}
 +
 +func newIncomingConnection(c *connection) *IncomingConnection {
 +      c.user = c.pConnection.Transport().User()
 +      c.virtualHost = c.pConnection.RemoteHostname()
 +      return &IncomingConnection{
 +              incoming:           makeIncoming(c.pConnection),
 +              connectionSettings: c.connectionSettings,
 +              c:                  c}
 +}
 +
 +// AcceptConnection is like Accept() but takes ConnectionOption s
 +// For example you can set the Heartbeat() for the accepted connection.
 +func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) 
Connection {
 +      return in.accept(func() Endpoint {
 +              for _, opt := range opts {
 +                      opt(in.c)
 +              }
 +              in.c.pConnection.Open()
 +              return in.c
 +      }).(Connection)
 +}
 +
 +func (in *IncomingConnection) Accept() Endpoint {
 +      return in.AcceptConnection()
 +}
 +
 +func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
 +
 +// SASLEnable returns a ConnectionOption that enables SASL authentication.
 +// Only required if you don't set any other SASL options.
 +func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
 +
 +// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
 +// mechanisms.
 +//
 +// Can be used on the client or the server to restrict the SASL for a 
connection.
 +// mechs is a space-separated list of mechanism names.
 +//
 +func SASLAllowedMechs(mechs string) ConnectionOption {
 +      return func(c *connection) { sasl(c).AllowedMechs(mechs) }
 +}
 +
 +// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
 +// text SASL authentication mechanisms
 +//
 +// By default the SASL layer is configured not to allow mechanisms that 
disclose
 +// the clear text of the password over an unencrypted AMQP connection. This 
specifically
 +// will disallow the use of the PLAIN mechanism without using SSL encryption.
 +//
 +// This default is to avoid disclosing password information accidentally over 
an
 +// insecure network.
 +//
 +func SASLAllowInsecure(b bool) ConnectionOption {
 +      return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
 +}
 +
 +// Heartbeat returns a ConnectionOption that requests the maximum delay
 +// between sending frames for the remote peer. If we don't receive any frames
 +// within 2*delay we will close the connection.
 +//
 +func Heartbeat(delay time.Duration) ConnectionOption {
 +      // Proton-C divides the idle-timeout by 2 before sending, so compensate.
 +      return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * 
delay) }
 +}
 +
 +// GlobalSASLConfigDir sets the SASL configuration directory for every
 +// Connection created in this process. If not called, the default is 
determined
 +// by your SASL installation.
 +//
 +// You can set SASLAllowInsecure and SASLAllowedMechs on individual 
connections.
 +//
 +func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir }
 +
 +// GlobalSASLConfigName sets the SASL configuration name for every Connection
 +// created in this process. If not called the default is "proton-server".
 +//
 +// The complete configuration file name is
 +//     <sasl-config-dir>/<sasl-config-name>.conf
 +//
 +// You can set SASLAllowInsecure and SASLAllowedMechs on individual 
connections.
 +//
 +func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir }
 +
 +var (
 +      globalSASLConfigName string
 +      globalSASLConfigDir  string
 +)
 +
 +// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
 +// can realistically offer is global configuration. Later if/when the pn_sasl 
C
 +// impl is fixed we can offer per connection over-rides.
 +func globalSASLInit(eng *proton.Engine) {
 +      sasl := eng.Transport().SASL()
 +      if globalSASLConfigName != "" {
 +              sasl.ConfigName(globalSASLConfigName)
 +      }
 +      if globalSASLConfigDir != "" {
 +              sasl.ConfigPath(globalSASLConfigDir)
 +      }
 +}
 +
 +// Dial is shorthand for using net.Dial() then NewConnection()
- func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err 
error) {
-       conn, err := net.Dial(network, addr)
++// See net.Dial() for the meaning of the network, address arguments.
++func Dial(network, address string, opts ...ConnectionOption) (c Connection, 
err error) {
++      conn, err := net.Dial(network, address)
 +      if err == nil {
 +              c, err = NewConnection(conn, opts...)
 +      }
 +      return
 +}
 +
 +// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
- func DialWithDialer(dialer *net.Dialer, network, addr string, opts 
...ConnectionOption) (c Connection, err error) {
-       conn, err := dialer.Dial(network, addr)
++// See net.Dial() for the meaning of the network, address arguments.
++func DialWithDialer(dialer *net.Dialer, network, address string, opts 
...ConnectionOption) (c Connection, err error) {
++      conn, err := dialer.Dial(network, address)
 +      if err == nil {
 +              c, err = NewConnection(conn, opts...)
 +      }
 +      return
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index efb24ff,0000000..7c19aa5
mode 100644,000000..100644
--- a/electron/container.go
+++ b/electron/container.go
@@@ -1,104 -1,0 +1,105 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "net"
 +      "qpid.apache.org/proton"
 +      "strconv"
 +      "sync/atomic"
 +)
 +
 +// Container is an AMQP container, it represents a single AMQP "application"
 +// which can have multiple client or server connections.
 +//
 +// Each Container in a distributed AMQP application must have a unique
 +// container-id which is applied to its connections.
 +//
 +// Create with NewContainer()
 +//
 +type Container interface {
 +      // Id is a unique identifier for the container in your distributed 
application.
 +      Id() string
 +
 +      // Connection creates a connection associated with this container.
 +      Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)
 +
 +      // Dial is shorthand for
 +      //     conn, err := net.Dial(); c, err := Connection(conn, opts...)
-       Dial(network string, addr string, opts ...ConnectionOption) 
(Connection, error)
++      // See net.Dial() for the meaning of the network, address arguments.
++      Dial(network string, address string, opts ...ConnectionOption) 
(Connection, error)
 +
 +      // Accept is shorthand for:
 +      //     conn, err := l.Accept(); c, err := Connection(conn, append(opts, 
Server()...)
 +      Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)
 +
 +      // String returns Id()
 +      String() string
 +}
 +
 +type container struct {
 +      id         string
 +      tagCounter uint64
 +}
 +
 +func (cont *container) nextTag() string {
 +      return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32)
 +}
 +
 +// NewContainer creates a new container. The id must be unique in your
 +// distributed application, all connections created by the container
 +// will have this container-id.
 +//
 +// If id == "" a random UUID will be generated for the id.
 +func NewContainer(id string) Container {
 +      if id == "" {
 +              id = proton.UUID4().String()
 +      }
 +      cont := &container{id: id}
 +      return cont
 +}
 +
 +func (cont *container) Id() string { return cont.id }
 +
 +func (cont *container) String() string { return cont.Id() }
 +
 +func (cont *container) nextLinkName() string {
 +      return cont.id + "@" + cont.nextTag()
 +}
 +
 +func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) 
(Connection, error) {
 +      return NewConnection(conn, append(opts, Parent(cont))...)
 +}
 +
 +func (cont *container) Dial(network, address string, opts 
...ConnectionOption) (c Connection, err error) {
 +      conn, err := net.Dial(network, address)
 +      if err == nil {
 +              c, err = cont.Connection(conn, opts...)
 +      }
 +      return
 +}
 +
 +func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c 
Connection, err error) {
 +      conn, err := l.Accept()
 +      if err == nil {
 +              c, err = cont.Connection(conn, append(opts, Server())...)
 +      }
 +      return
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/example_client_server_test.go
----------------------------------------------------------------------
diff --cc electron/example_client_server_test.go
index 0000000,0000000..3aa5892
new file mode 100644
--- /dev/null
+++ b/electron/example_client_server_test.go
@@@ -1,0 -1,0 +1,85 @@@
++package electron_test
++
++import (
++      "fmt"
++      "net"
++      "qpid.apache.org/amqp"
++      "qpid.apache.org/electron"
++      "sync"
++)
++
++// Example Server that accepts a single Connection, Session and Receiver link
++// and prints messages received until the link closes.
++func Server(l net.Listener) {
++      cont := electron.NewContainer("server")
++      c, _ := cont.Accept(l) // Ignoring error handling
++      l.Close()              // This server only accepts one connection
++      // Process incoming endpoints till we get a Receiver link
++      var r electron.Receiver
++      for r == nil {
++              in := <-c.Incoming()
++              switch in := in.(type) {
++              case *electron.IncomingSession, *electron.IncomingConnection:
++                      in.Accept() // Accept the incoming connection and 
session for the receiver
++              case *electron.IncomingReceiver:
++                      in.SetCapacity(10)
++                      in.SetPrefetch(true) // Automatic flow control for a 
buffer of 10 messages.
++                      r = in.Accept().(electron.Receiver)
++              case nil:
++                      return // Connection is closed
++              default:
++                      in.Reject(amqp.Errorf("example-server", "unexpected 
endpoint %v", in))
++              }
++      }
++      go func() { // Reject any further incoming endpoints
++              for in := range c.Incoming() {
++                      in.Reject(amqp.Errorf("example-server", "unexpected 
endpoint %v", in))
++              }
++      }()
++      // Receive messages till the Receiver closes
++      rm, err := r.Receive()
++      for ; err == nil; rm, err = r.Receive() {
++              fmt.Printf("server received: %q\n", rm.Message.Body())
++              rm.Accept() // Signal to the client that the message was 
accepted
++      }
++      fmt.Printf("server receiver closed: %v\n", err)
++}
++
++// Example client sending messages to a server running in a goroutine.
++//
++// Normally client and server would be separate processes. For more realistic 
and detailed examples:
++//     https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
++//
++func Example_clientServer() {
++      // NOTE: We ignoring error handling in this example
++      l, _ := net.Listen("tcp", "") // Open a listening port for server, 
client connect to this port
++
++      // SERVER: start the server running in a separate goroutine
++      var waitServer sync.WaitGroup // We will wait for the server goroutine 
to finish before exiting
++      waitServer.Add(1)
++      go func() { // Run the server in the background
++              defer waitServer.Done()
++              Server(l)
++      }()
++
++      // CLIENT: Send messages to the server
++      addr := l.Addr()
++      c, _ := electron.Dial(addr.Network(), addr.String())
++      s, _ := c.Sender()
++      for i := 0; i < 3; i++ {
++              msg := fmt.Sprintf("hello %v", i)
++              // Send and wait for the Outcome from the server.
++              // Note: For higher throughput, use SendAsync() to send a 
stream of messages
++              // and process the returning stream of Outcomes concurrently.
++              s.SendSync(amqp.NewMessageWith(msg))
++      }
++      c.Close(nil) // Closing the connection will stop the server
++
++      waitServer.Wait() // Let the server finish
++
++      // Output:
++      // server received: "hello 0"
++      // server received: "hello 1"
++      // server received: "hello 2"
++      // server receiver closed: EOF
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to