PROTON-1293: Go binding SASL support.

proton package
- added sasl.h to the Go proton wrapper generator.
- capitalized as "SASL" *not* "Sasl", consistent with Go conventions (e.g. URL)
- added SASL() accessor to proton.Transport.
- Engine.Run() extra dispatch and check as 
pn_connection_engine_read/write_buffer()
  can generate events that close the transport in an auth failure.
- Engine.Id()/String() use transport address to match PN_TRACE_ logs.
- Drop auto-open of engine: let user set security first.

electron package:
- new ConnectionOption: User, Password, VirtualHost, SASLAllowInsecure, 
SASLAllowedMechs
- global settings with GlobalSASLConfigDir, GobalSASLConfName
- IncomingConnection allows user to accept/reject authenticated incoming 
connections
- updated & improved documentation
- Endpoint.Sync() synchronous check of endpoint open, to verify authenticated.

Note: Password option takes []byte not string. There is no way to securely erase
a string from memory, an array can be over-written.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0e37353d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0e37353d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0e37353d

Branch: refs/heads/master
Commit: 0e37353d7410841ebb291b14e05b7dc9943cd1ce
Parents: 3d9fe62
Author: Alan Conway <acon...@redhat.com>
Authored: Tue Sep 6 13:11:09 2016 -0400
Committer: Alan Conway <acon...@redhat.com>
Committed: Tue Sep 20 17:39:15 2016 -0400

----------------------------------------------------------------------
 proton-c/bindings/go/genwrap.go                 |  42 +-
 .../src/qpid.apache.org/electron/auth_test.go   | 124 +++++
 .../src/qpid.apache.org/electron/connection.go  | 209 ++++++--
 .../src/qpid.apache.org/electron/container.go   |  15 +-
 .../go/src/qpid.apache.org/electron/doc.go      |  35 +-
 .../qpid.apache.org/electron/electron_test.go   | 477 +++++++++++++++++++
 .../go/src/qpid.apache.org/electron/endpoint.go | 122 ++++-
 .../go/src/qpid.apache.org/electron/handler.go  |  80 ++--
 .../go/src/qpid.apache.org/electron/link.go     |  36 +-
 .../qpid.apache.org/electron/messaging_test.go  | 454 ------------------
 .../go/src/qpid.apache.org/electron/receiver.go |  10 +-
 .../go/src/qpid.apache.org/electron/sender.go   |  10 +-
 .../go/src/qpid.apache.org/proton/engine.go     |  83 ++--
 .../go/src/qpid.apache.org/proton/handlers.go   |   2 +-
 .../go/src/qpid.apache.org/proton/wrappers.go   |  21 +-
 .../src/qpid.apache.org/proton/wrappers_gen.go  |  80 +++-
 proton-c/src/engine/engine.c                    |   4 +
 17 files changed, 1126 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go
index c904638..f295a32 100644
--- a/proton-c/bindings/go/genwrap.go
+++ b/proton-c/bindings/go/genwrap.go
@@ -19,7 +19,9 @@ under the License.
 
 // Code generator to generate a thin Go wrapper API around the C proton API.
 //
-
+// Not run automatically, generated sources are checked in. To update the
+// generated sources run `go run genwrap.go` in this directory.
+//
 package main
 
 import (
@@ -44,7 +46,7 @@ func main() {
        panicIf(err)
        defer out.Close()
 
-       apis := []string{"session", "link", "delivery", "disposition", 
"condition", "terminus", "connection", "transport"}
+       apis := []string{"session", "link", "delivery", "disposition", 
"condition", "terminus", "connection", "transport", "sasl"}
        fmt.Fprintln(out, copyright)
        fmt.Fprint(out, `
 package proton
@@ -89,12 +91,18 @@ import (
        }
 }
 
+// Identify acronyms that should be uppercase not Mixedcase
+var acronym = regexp.MustCompile("(?i)SASL|AMQP")
+
 func mixedCase(s string) string {
        result := ""
        for _, w := range strings.Split(s, "_") {
-               if w != "" {
-                       result = result + strings.ToUpper(w[0:1]) + 
strings.ToLower(w[1:])
+               if acronym.MatchString(w) {
+                       w = strings.ToUpper(w)
+               } else {
+                       w = strings.ToUpper(w[0:1]) + strings.ToLower(w[1:])
                }
+               result = result + w
        }
        return result
 }
@@ -122,7 +130,13 @@ func findEnums(header string) (enums []enumType) {
        return enums
 }
 
+// Types that are integral, not wrappers. Enums are added automatically.
+var simpleType = map[string]bool{
+       "State": true, // integral typedef
+}
+
 func genEnum(out io.Writer, name string, values []string) {
+       simpleType[mixedCase(name)] = true
        doTemplate(out, []interface{}{name, values}, `
 {{$enumName := index . 0}}{{$values := index . 1}}
 type {{mixedCase $enumName}} C.pn_{{$enumName}}_t
@@ -140,10 +154,6 @@ func (e {{mixedCase $enumName}}) String() string {
 `)
 }
 
-var (
-       reSpace = regexp.MustCompile("\\s+")
-)
-
 func panicIf(err error) {
        if err != nil {
                panic(err)
@@ -202,7 +212,7 @@ var (
        enumDefRe   = regexp.MustCompile("typedef enum {([^}]*)} 
pn_([a-z_]+)_t;")
        enumValRe   = regexp.MustCompile("PN_[A-Z_]+")
        skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER")
-       skipFnRe    = 
regexp.MustCompile("attach|context|class|collect|link_recv|link_send|transport_.*logf$|transport_.*trace|transport_head|transport_push")
+       skipFnRe    = 
regexp.MustCompile("attach|context|class|collect|link_recv|link_send|transport_.*logf$|transport_.*trace|transport_head|transport_push|connection_set_password")
 )
 
 // Generate event wrappers.
@@ -268,20 +278,10 @@ func (g genType) goConvert(value string) string {
        }
 }
 
-var notStruct = map[string]bool{
-       "EventType":        true,
-       "SndSettleMode":    true,
-       "RcvSettleMode":    true,
-       "TerminusType":     true,
-       "State":            true,
-       "Durability":       true,
-       "ExpiryPolicy":     true,
-       "DistributionMode": true,
-}
-
 func mapType(ctype string) (g genType) {
        g.Ctype = "C." + strings.Trim(ctype, " \n")
 
+       // Special-case mappings for C types, default: is the general wrapper 
type case.
        switch g.Ctype {
        case "C.void":
                g.Gotype = ""
@@ -331,7 +331,7 @@ func mapType(ctype string) (g genType) {
                        panic(fmt.Errorf("unknown C type %#v", g.Ctype))
                }
                g.Gotype = mixedCase(match[1])
-               if !notStruct[g.Gotype] {
+               if !simpleType[g.Gotype] {
                        g.ToGo = g.goLiteral
                        g.ToC = func(v string) string { return v + ".pn" }
                }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
new file mode 100644
index 0000000..a090b78
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+       "fmt"
+       "io/ioutil"
+       "os"
+       "os/exec"
+       "path/filepath"
+       "strings"
+       "testing"
+)
+
+func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts 
[]ConnectionOption) (got connectionSettings, err error) {
+       client, server := newClientServerOpts(t, copts, sopts)
+       defer closeClientServer(client, server)
+
+       go func() {
+               for in := range server.Incoming() {
+                       switch in := in.(type) {
+                       case *IncomingConnection:
+                               got = connectionSettings{user: in.User(), 
virtualHost: in.VirtualHost()}
+                       }
+                       in.Accept()
+               }
+       }()
+
+       err = client.Sync()
+       return
+}
+
+func TestAuthAnonymous(t *testing.T) {
+       fatalIf(t, configureSASL())
+       got, err := testAuthClientServer(t,
+               []ConnectionOption{User("fred"), VirtualHost("vhost"), 
SASLAllowInsecure(true)},
+               []ConnectionOption{SASLAllowedMechs("ANONYMOUS"), 
SASLAllowInsecure(true)})
+       fatalIf(t, err)
+       errorIf(t, checkEqual(connectionSettings{"anonymous", "vhost"}, got))
+}
+
+func TestAuthPlain(t *testing.T) {
+       fatalIf(t, configureSASL())
+       got, err := testAuthClientServer(t,
+               []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
+               []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN")})
+       fatalIf(t, err)
+       errorIf(t, checkEqual(connectionSettings{"fred@proton", ""}, got))
+}
+
+func TestAuthBadPass(t *testing.T) {
+       fatalIf(t, configureSASL())
+       _, err := testAuthClientServer(t,
+               []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
+               []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN")})
+       if err == nil {
+               t.Error("Expected auth failure for bad pass")
+       }
+}
+
+func TestAuthBadUser(t *testing.T) {
+       fatalIf(t, configureSASL())
+       _, err := testAuthClientServer(t,
+               []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
+               []ConnectionOption{SASLAllowInsecure(true), 
SASLAllowedMechs("PLAIN")})
+       if err == nil {
+               t.Error("Expected auth failure for bad user")
+       }
+}
+
+var confDir string
+var confErr error
+
+func configureSASL() error {
+       if confDir != "" || confErr != nil {
+               return confErr
+       }
+       confDir, confErr = ioutil.TempDir("", "")
+       if confErr != nil {
+               return confErr
+       }
+
+       GlobalSASLConfigDir(confDir)
+       GlobalSASLConfigName("test")
+       conf := filepath.Join(confDir, "test.conf")
+
+       db := filepath.Join(confDir, "proton.sasldb")
+       cmd := exec.Command("saslpasswd2", "-c", "-p", "-f", db, "-u", 
"proton", "fred")
+       cmd.Stdin = strings.NewReader("xxx") // Password
+       if out, err := cmd.CombinedOutput(); err != nil {
+               confErr = fmt.Errorf("saslpasswd2 failed: %s\n%s", err, out)
+               return confErr
+       }
+       confStr := "sasldb_path: " + db + "\nmech_list: EXTERNAL DIGEST-MD5 
SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS\n"
+       if err := ioutil.WriteFile(conf, []byte(confStr), os.ModePerm); err != 
nil {
+               confErr = fmt.Errorf("write conf file %s failed: %s", conf, err)
+       }
+       return confErr
+}
+
+func TestMain(m *testing.M) {
+       status := m.Run()
+       if confDir != "" {
+               os.RemoveAll(confDir)
+       }
+       os.Exit(status)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 96feb1f..3bc5dcf 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -23,16 +23,34 @@ package electron
 import "C"
 
 import (
-       "fmt"
        "net"
        "qpid.apache.org/proton"
        "sync"
        "time"
 )
 
+// Settings associated with a Connection.
+type ConnectionSettings interface {
+       // Authenticated user name associated with the connection.
+       User() string
+
+       // The AMQP virtual host name for the connection.
+       //
+       // Optional, useful when the server has multiple names and provides 
different
+       // service based on the name the client uses to connect.
+       //
+       // By default it is set to the DNS host name that the client uses to 
connect,
+       // but it can be set to something different at the client side with the
+       // VirtualHost() option.
+       //
+       // Returns error if the connection fails to authenticate.
+       VirtualHost() string
+}
+
 // Connection is an AMQP connection, created by a Container.
 type Connection interface {
        Endpoint
+       ConnectionSettings
 
        // Sender opens a new sender on the DefaultSession.
        Sender(...LinkOption) (Sender, error)
@@ -59,48 +77,79 @@ type Connection interface {
        // WaitTimeout is like Wait but returns Timeout if the timeout expires.
        WaitTimeout(time.Duration) error
 
-       // Incoming returns a channel for incoming endpoints opened by the 
remote end.
-       //
-       // To enable, pass AllowIncoming() when creating the Connection. 
Otherwise all
-       // incoming endpoint requests are automatically rejected and Incoming()
-       // returns nil.
-       //
-       // An Incoming value can be an *IncomingSession, *IncomingSender or
-       // *IncomingReceiver.  You must call Accept() to open the endpoint or 
Reject()
-       // to close it with an error. The specific Incoming types have 
additional
-       // methods to configure the endpoint.
-       //
-       // Not receiving from Incoming() or not calling Accept/Reject will 
block the
-       // electron event loop. Normally you would have a dedicated goroutine 
receive
-       // from Incoming() and start new goroutines to serve each incoming 
endpoint.
-       // The channel is closed when the Connection closes.
+       // Incoming returns a channel for incoming endpoints opened by the 
remote peer.
+       // See the Incoming interface for more.
        //
+       // Not receiving from Incoming() and calling Accept/Reject will block 
the
+       // electron event loop. You should run a loop to handle the types that
+       // interest you in a switch{} and and Accept() all others.
        Incoming() <-chan Incoming
 }
 
+type connectionSettings struct {
+       user, virtualHost string
+}
+
+func (c connectionSettings) User() string        { return c.user }
+func (c connectionSettings) VirtualHost() string { return c.virtualHost }
+
 // ConnectionOption can be passed when creating a connection to configure 
various options
 type ConnectionOption func(*connection)
 
-// Server returns a ConnectionOption to put the connection in server mode.
+// User returns a ConnectionOption sets the user name for a connection
+func User(user string) ConnectionOption {
+       return func(c *connection) {
+               c.user = user
+               c.pConnection.SetUser(user)
+       }
+}
+
+// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the 
connection.
+// Only applies to outbound client connection.
+func VirtualHost(virtualHost string) ConnectionOption {
+       return func(c *connection) {
+               c.virtualHost = virtualHost
+               c.pConnection.SetHostname(virtualHost)
+       }
+}
+
+// Password returns a ConnectionOption to set the password used to establish a
+// connection.  Only applies to outbound client connection.
+//
+// The connection will erase its copy of the password from memory as soon as it
+// has been used to authenticate. If you are concerned about paswords staying 
in
+// memory you should never store them as strings, and should overwrite your
+// copy as soon as you are done with it.
+//
+func Password(password []byte) ConnectionOption {
+       return func(c *connection) { c.pConnection.SetPassword(password) }
+}
+
+// Server returns a ConnectionOption to put the connection in server mode for 
incoming connections.
 //
 // A server connection will do protocol negotiation to accept a incoming AMQP
 // connection. Normally you would call this for a connection created by
 // net.Listener.Accept()
 //
-func Server() ConnectionOption { return func(c *connection) { 
c.engine.Server() } }
+func Server() ConnectionOption {
+       return func(c *connection) { c.engine.Server(); c.server = true; 
AllowIncoming()(c) }
+}
 
-// AllowIncoming returns a ConnectionOption to enable incoming endpoint open 
requests.
-// See Connection.Incoming()
+// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
+// Connection.Incoming() This is automatically set for Server() connections.
 func AllowIncoming() ConnectionOption {
        return func(c *connection) { c.incoming = make(chan Incoming) }
 }
 
 type connection struct {
        endpoint
+       connectionSettings
+
        defaultSessionOnce, closeOnce sync.Once
 
        container   *container
        conn        net.Conn
+       server      bool
        incoming    chan Incoming
        handler     *handler
        engine      *proton.Engine
@@ -110,23 +159,32 @@ type connection struct {
 }
 
 func newConnection(conn net.Conn, cont *container, setting 
...ConnectionOption) (*connection, error) {
-       c := &connection{container: cont, conn: conn}
+       c := &connection{
+               container: cont,
+               conn:      conn,
+       }
        c.handler = newHandler(c)
        var err error
        c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
        if err != nil {
                return nil, err
        }
+       c.pConnection = c.engine.Connection()
+       c.pConnection.SetContainer(cont.Id())
        for _, set := range setting {
                set(c)
        }
+       globalSASLInit(c.engine)
+
        c.endpoint.init(c.engine.String())
-       c.pConnection = c.engine.Connection()
        go c.run()
        return c, nil
 }
 
 func (c *connection) run() {
+       if !c.server {
+               c.pConnection.Open()
+       }
        c.engine.Run()
        if c.incoming != nil {
                close(c.incoming)
@@ -201,46 +259,95 @@ func (c *connection) WaitTimeout(timeout time.Duration) 
error {
        return c.Error()
 }
 
-func (c *connection) Incoming() <-chan Incoming { return c.incoming }
+func (c *connection) Incoming() <-chan Incoming {
+       assert(c.incoming != nil, "electron.Connection.Incoming() disabled for 
%s", c)
+       return c.incoming
+}
 
-// Incoming is the interface for incoming requests to open an endpoint.
-// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
-type Incoming interface {
-       // Accept and open the endpoint.
-       Accept() Endpoint
+type IncomingConnection struct {
+       incoming
+       connectionSettings
+       c *connection
+}
 
-       // Reject the endpoint with an error
-       Reject(error)
+func newIncomingConnection(c *connection) *IncomingConnection {
+       c.user = c.pConnection.Transport().User()
+       c.virtualHost = c.pConnection.RemoteHostname()
+       return &IncomingConnection{
+               incoming:           makeIncoming(c.pConnection),
+               connectionSettings: c.connectionSettings,
+               c:                  c}
+}
 
-       // wait for and call the accept function, call in proton goroutine.
-       wait() error
-       pEndpoint() proton.Endpoint
+func (in *IncomingConnection) Accept() Endpoint {
+       return in.accept(func() Endpoint {
+               in.c.pConnection.Open()
+               return in.c
+       })
 }
 
-type incoming struct {
-       pep      proton.Endpoint
-       acceptCh chan func() error
+func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
+
+// SASLEnable returns a ConnectionOption that enables SASL authentication.
+// Only required if you don't set any other SASL options.
+func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
+
+// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
+// mechanisms.
+//
+// Can be used on the client or the server to restrict the SASL for a 
connection.
+// mechs is a space-separated list of mechanism names.
+//
+func SASLAllowedMechs(mechs string) ConnectionOption {
+       return func(c *connection) { sasl(c).AllowedMechs(mechs) }
 }
 
-func makeIncoming(e proton.Endpoint) incoming {
-       return incoming{pep: e, acceptCh: make(chan func() error)}
+// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
+// text SASL authentication mechanisms
+//
+// By default the SASL layer is configured not to allow mechanisms that 
disclose
+// the clear text of the password over an unencrypted AMQP connection. This 
specifically
+// will disallow the use of the PLAIN mechanism without using SSL encryption.
+//
+// This default is to avoid disclosing password information accidentally over 
an
+// insecure network.
+//
+func SASLAllowInsecure(b bool) ConnectionOption {
+       return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
 }
 
-func (in *incoming) String() string   { return fmt.Sprintf("%s: %s", 
in.pep.Type(), in.pep) }
-func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return 
err } }
+// GlobalSASLConfigDir sets the SASL configuration directory for every
+// Connection created in this process. If not called, the default is determined
+// by your SASL installation.
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual 
connections.
+//
+func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir }
 
-// Call in proton goroutine, wait for and call the accept function fr
-func (in *incoming) wait() error { return (<-in.acceptCh)() }
+// GlobalSASLConfigName sets the SASL configuration name for every Connection
+// created in this process. If not called the default is "proton-server".
+//
+// The complete configuration file name is
+//     <sasl-config-dir>/<sasl-config-name>.conf
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual 
connections.
+//
+func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir }
 
-func (in *incoming) pEndpoint() proton.Endpoint { return in.pep }
+var (
+       globalSASLConfigName string
+       globalSASLConfigDir  string
+)
 
-// Called in app goroutine to send an accept function to proton and return the 
resulting endpoint.
-func (in *incoming) accept(f func() Endpoint) Endpoint {
-       done := make(chan Endpoint)
-       in.acceptCh <- func() error {
-               ep := f()
-               done <- ep
-               return nil
+// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
+// can realistically offer is global configuration. Later if/when the pn_sasl C
+// impl is fixed we can offer per connection over-rides.
+func globalSASLInit(eng *proton.Engine) {
+       sasl := eng.Transport().SASL()
+       if globalSASLConfigName != "" {
+               sasl.ConfigName(globalSASLConfigName)
+       }
+       if globalSASLConfigDir != "" {
+               sasl.ConfigPath(globalSASLConfigDir)
        }
-       return <-done
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index b5ce6c0..1ab4df2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -35,12 +35,17 @@ type Container interface {
        // Id is a unique identifier for the container in your distributed 
application.
        Id() string
 
-       // Create a new AMQP Connection over the supplied net.Conn connection.
+       // Enable AMQP over the supplied net.Conn. Returns a Connection 
endpoint.
        //
-       // You must call Connection.Open() on the returned Connection, after
-       // setting any Connection properties you need to set. Note the net.Conn
-       // can be an outgoing connection (e.g. made with net.Dial) or an 
incoming
-       // connection (e.g. made with net.Listener.Accept())
+       // For client connections (e.g. established with net.Dial()), you can 
start
+       // using the connection immediately. Connection.Incoming() is disabled 
by
+       // default for clients, pass an AllowIncoming() option to enable 
incoming
+       // sessions and links.
+       //
+       // For server connection (e.g. established with net.Listener.Accept()) 
you
+       // must pass the Server() option and receive from the 
Connection.Incoming()
+       // channel. The first Incoming value will be an *IncomingConnection 
that lets
+       // you examine the connection properties before Accept() or Reject()
        Connection(net.Conn, ...ConnectionOption) (Connection, error)
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index 46bde37..207d8ba 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -21,21 +21,20 @@ under the License.
 Package electron is a procedural, concurrent-safe Go library for AMQP 
messaging.
 You can write clients and servers using this library.
 
-Start by creating a Container with NewContainer. A Container represents a 
client
-or server application that can contain many incoming or outgoing connections.
+Start by creating a Container with NewContainer. An AMQP Container represents a
+single AMQP "application" and can contain client and server connections.
 
-Create connections with the standard Go 'net' package using net.Dial or
-net.Listen. Create an AMQP connection over a net.Conn with
-Container.Connection() and open it with Connection.Open().
+You can enable AMQP over any connection that implements the standard net.Conn
+interface. Typically you can connect with net.Dial() or listen for server
+connections with net.Listen.  Enable AMQP by passing the net.Conn to
+Container.Connection().
 
-AMQP sends messages over "links". Each link has a Sender end and a Receiver
-end. Connection.Sender() and Connection.Receiver() allow you to create links to
-Send() and Receive() messages.
-
-You can create an AMQP server connection by calling Connection.Server() and
-Connection.Listen() before calling Connection.Open(). A server connection can
-negotiate protocol security details and can accept incoming links opened from
-the remote end of the connection.
+AMQP allows bi-direction peer-to-peer message exchange as well as
+client-to-broker. Messages are sent over "links". Each link is one-way and has 
a
+Sender and Receiver end. Connection.Sender() and Connection.Receiver() open
+links to Send() and Receive() messages. Connection.Incoming() lets you accept
+incoming links opened by the remote peer. You can open and accept multiple 
links
+in both directions on a single Connection.
 
 */
 package electron
@@ -54,10 +53,10 @@ only accessed in the event-loop goroutine, so no locks are 
required there.
 The handler sets up channels as needed to get or send data from user goroutines
 using electron types like Sender or Receiver.
 
-We also use Engine.Inject to inject actions into the event loop from user
-goroutines. It is important to check at the start of an injected function that
-required objects are still valid, for example a link may be remotely closed
-between the time a Sender function calls Inject and the time the injected
-function is execute by the handler goroutine. See comments in endpoint.go for 
more.
+Engine.Inject injects actions into the event loop from user goroutines. It is
+important to check at the start of an injected function that required objects
+are still valid, for example a link may be remotely closed between the time a
+Sender function calls Inject and the time the injected function is execute by
+the handler goroutine.
 
 */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
new file mode 100644
index 0000000..56b91bf
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
@@ -0,0 +1,477 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+       "fmt"
+       "net"
+       "path"
+       "qpid.apache.org/amqp"
+       "reflect"
+       "runtime"
+       "testing"
+       "time"
+)
+
+func fatalIf(t *testing.T, err error) {
+       if err != nil {
+               _, file, line, ok := runtime.Caller(1) // annotate with 
location of caller.
+               if ok {
+                       _, file = path.Split(file)
+               }
+               t.Fatalf("(from %s:%d) %v", file, line, err)
+       }
+}
+
+func errorIf(t *testing.T, err error) {
+       if err != nil {
+               _, file, line, ok := runtime.Caller(1) // annotate with 
location of caller.
+               if ok {
+                       _, file = path.Split(file)
+               }
+               t.Errorf("(from %s:%d) %v", file, line, err)
+       }
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+       if !reflect.DeepEqual(want, got) {
+               return fmt.Errorf("%#v != %#v", want, got)
+       }
+       return nil
+}
+
+// Start a server, return listening addr and channel for incoming Connections.
+func newServer(t *testing.T, cont Container, opts ...ConnectionOption) 
(net.Addr, <-chan Connection) {
+       listener, err := net.Listen("tcp", "")
+       fatalIf(t, err)
+       addr := listener.Addr()
+       ch := make(chan Connection)
+       go func() {
+               conn, err := listener.Accept()
+               c, err := cont.Connection(conn, 
append([]ConnectionOption{Server()}, opts...)...)
+               fatalIf(t, err)
+               ch <- c
+       }()
+       return addr, ch
+}
+
+// Open a client connection and session, return the session.
+func newClient(t *testing.T, cont Container, addr net.Addr, opts 
...ConnectionOption) Session {
+       conn, err := net.Dial(addr.Network(), addr.String())
+       fatalIf(t, err)
+       c, err := cont.Connection(conn, opts...)
+       fatalIf(t, err)
+       sn, err := c.Session()
+       fatalIf(t, err)
+       return sn
+}
+
+// Return client and server ends of the same connection.
+func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts 
[]ConnectionOption) (client Session, server Connection) {
+       addr, ch := newServer(t, NewContainer("test-server"), sopts...)
+       client = newClient(t, NewContainer("test-client"), addr, copts...)
+       return client, <-ch
+}
+
+// Return client and server ends of the same connection.
+func newClientServer(t *testing.T) (client Session, server Connection) {
+       return newClientServerOpts(t, nil, nil)
+}
+
+// Close client and server
+func closeClientServer(client Session, server Connection) {
+       client.Connection().Close(nil)
+       server.Close(nil)
+}
+
+// Send a message one way with a client sender and server receiver, verify ack.
+func TestClientSendServerReceive(t *testing.T) {
+       nLinks := 3
+       nMessages := 3
+
+       rchan := make(chan Receiver, nLinks)
+       client, server := newClientServer(t)
+       go func() {
+               for in := range server.Incoming() {
+                       switch in := in.(type) {
+                       case *IncomingReceiver:
+                               in.SetCapacity(1)
+                               in.SetPrefetch(false)
+                               rchan <- in.Accept().(Receiver)
+                       default:
+                               in.Accept()
+                       }
+               }
+       }()
+
+       defer func() { closeClientServer(client, server) }()
+
+       s := make([]Sender, nLinks)
+       for i := 0; i < nLinks; i++ {
+               var err error
+               s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+       r := make([]Receiver, nLinks)
+       for i := 0; i < nLinks; i++ {
+               r[i] = <-rchan
+       }
+
+       for i := 0; i < nLinks; i++ {
+               for j := 0; j < nMessages; j++ {
+                       // Client send
+                       ack := make(chan Outcome, 1)
+                       sendDone := make(chan struct{})
+                       go func() {
+                               defer close(sendDone)
+                               m := 
amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
+                               var err error
+                               s[i].SendAsync(m, ack, "testing")
+                               if err != nil {
+                                       t.Fatal(err)
+                               }
+                       }()
+
+                       // Server recieve
+                       rm, err := r[i].Receive()
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if want, got := interface{}(fmt.Sprintf("foobar%v-%v", 
i, j)), rm.Message.Body(); want != got {
+                               t.Errorf("%#v != %#v", want, got)
+                       }
+
+                       // Should not be acknowledged on client yet
+                       <-sendDone
+                       select {
+                       case <-ack:
+                               t.Errorf("unexpected ack")
+                       default:
+                       }
+
+                       // Server send ack
+                       if err := rm.Reject(); err != nil {
+                               t.Error(err)
+                       }
+                       // Client get ack.
+                       if a := <-ack; a.Value != "testing" || a.Error != nil 
|| a.Status != Rejected {
+                               t.Error("unexpected ack: ", a.Status, a.Error, 
a.Value)
+                       }
+               }
+       }
+}
+
+func TestClientReceiver(t *testing.T) {
+       nMessages := 3
+       client, server := newClientServer(t)
+       go func() {
+               for in := range server.Incoming() {
+                       switch in := in.(type) {
+                       case *IncomingSender:
+                               s := in.Accept().(Sender)
+                               go func() {
+                                       for i := int32(0); i < 
int32(nMessages); i++ {
+                                               out := 
s.SendSync(amqp.NewMessageWith(i))
+                                               if out.Error != nil {
+                                                       t.Error(out.Error)
+                                                       return
+                                               }
+                                       }
+                                       s.Close(nil)
+                               }()
+                       default:
+                               in.Accept()
+                       }
+               }
+       }()
+
+       r, err := client.Receiver(Source("foo"))
+       if err != nil {
+               t.Fatal(err)
+       }
+       for i := int32(0); i < int32(nMessages); i++ {
+               rm, err := r.Receive()
+               if err != nil {
+                       if err != Closed {
+                               t.Error(err)
+                       }
+                       break
+               }
+               if err := rm.Accept(); err != nil {
+                       t.Error(err)
+               }
+               if b, ok := rm.Message.Body().(int32); !ok || b != i {
+                       t.Errorf("want %v, true got %v, %v", i, b, ok)
+               }
+       }
+       server.Close(nil)
+       client.Connection().Close(nil)
+}
+
+// Test timeout versions of waiting functions.
+func TestTimeouts(t *testing.T) {
+       var err error
+       rchan := make(chan Receiver, 1)
+       client, server := newClientServer(t)
+       go func() {
+               for i := range server.Incoming() {
+                       switch i := i.(type) {
+                       case *IncomingReceiver:
+                               i.SetCapacity(1)
+                               i.SetPrefetch(false)
+                               rchan <- i.Accept().(Receiver) // Issue credit 
only on receive
+                       default:
+                               i.Accept()
+                       }
+               }
+       }()
+       defer func() { closeClientServer(client, server) }()
+
+       // Open client sender
+       snd, err := client.Sender(Target("test"))
+       if err != nil {
+               t.Fatal(err)
+       }
+       rcv := <-rchan
+
+       // Test send with timeout
+       short := time.Millisecond
+       long := time.Second
+       m := amqp.NewMessage()
+       if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No 
credit, expect timeout.
+               t.Error("want Timeout got", err)
+       }
+       if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No 
credit, expect timeout.
+               t.Error("want Timeout got", err)
+       }
+       // Test receive with timeout
+       if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, 
expect timeout.
+               t.Error("want Timeout got", err)
+       }
+       // Test receive with timeout
+       if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, 
expect timeout.
+               t.Error("want Timeout got", err)
+       }
+       // There is now a credit on the link due to receive
+       ack := make(chan Outcome)
+       snd.SendAsyncTimeout(m, ack, nil, short)
+       // Disposition should timeout
+       select {
+       case <-ack:
+               t.Errorf("want Timeout got %#v", ack)
+       case <-time.After(short):
+       }
+
+       // Receive and accept
+       rm, err := rcv.ReceiveTimeout(long)
+       if err != nil {
+               t.Fatal(err)
+       }
+       rm.Accept()
+       // Sender get ack
+       if a := <-ack; a.Status != Accepted || a.Error != nil {
+               t.Errorf("want (accepted, nil) got %#v", a)
+       }
+}
+
+// A server that returns the opposite end of each client link via channels.
+type pairs struct {
+       t        *testing.T
+       client   Session
+       server   Connection
+       rchan    chan Receiver
+       schan    chan Sender
+       capacity int
+       prefetch bool
+}
+
+func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
+       p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan 
Sender, 1)}
+       p.client, p.server = newClientServer(t)
+       go func() {
+               for i := range p.server.Incoming() {
+                       switch i := i.(type) {
+                       case *IncomingReceiver:
+                               i.SetCapacity(capacity)
+                               i.SetPrefetch(prefetch)
+                               p.rchan <- i.Accept().(Receiver)
+                       case *IncomingSender:
+                               p.schan <- i.Accept().(Sender)
+                       default:
+                               i.Accept()
+                       }
+               }
+       }()
+       return p
+}
+
+func (p *pairs) close() {
+       closeClientServer(p.client, p.server)
+}
+
+// Return a client sender and server receiver
+func (p *pairs) senderReceiver() (Sender, Receiver) {
+       snd, err := p.client.Sender()
+       fatalIf(p.t, err)
+       rcv := <-p.rchan
+       return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pairs) receiverSender() (Receiver, Sender) {
+       rcv, err := p.client.Receiver()
+       fatalIf(p.t, err)
+       snd := <-p.schan
+       return rcv, snd
+}
+
+type result struct {
+       label string
+       err   error
+       value interface{}
+}
+
+func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) 
}
+
+func doSend(snd Sender, results chan result) {
+       err := snd.SendSync(amqp.NewMessage()).Error
+       results <- result{"send", err, nil}
+}
+
+func doReceive(rcv Receiver, results chan result) {
+       msg, err := rcv.Receive()
+       results <- result{"receive", err, msg}
+}
+
+func doDisposition(ack <-chan Outcome, results chan result) {
+       results <- result{"disposition", (<-ack).Error, nil}
+}
+
+// Senders get credit immediately if receivers have prefetch set
+func TestSendReceivePrefetch(t *testing.T) {
+       pairs := newPairs(t, 1, true)
+       s, r := pairs.senderReceiver()
+       s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should 
not block for credit.
+       if _, err := r.Receive(); err != nil {
+               t.Error(err)
+       }
+}
+
+// Senders do not get credit till Receive() if receivers don't have prefetch
+func TestSendReceiveNoPrefetch(t *testing.T) {
+       pairs := newPairs(t, 1, false)
+       s, r := pairs.senderReceiver()
+       done := make(chan struct{}, 1)
+       go func() {
+               s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // 
Should block for credit.
+               close(done)
+       }()
+       select {
+       case <-done:
+               t.Errorf("send should be blocked on credit")
+       default:
+               if _, err := r.Receive(); err != nil {
+                       t.Error(err)
+               } else {
+                       <-done
+               } // Should be unblocked now
+       }
+}
+
+// Test that closing Links interrupts blocked link functions.
+func TestLinkCloseInterrupt(t *testing.T) {
+       want := amqp.Error{Name: "x", Description: "all bad"}
+       pairs := newPairs(t, 1, false)
+       results := make(chan result) // Collect expected errors
+
+       // Note closing the link does not interrupt Send() calls, the AMQP spec 
says
+       // that deliveries can be settled after the link is closed.
+
+       // Receiver.Close() interrupts Receive()
+       snd, rcv := pairs.senderReceiver()
+       go doReceive(rcv, results)
+       rcv.Close(want)
+       if r := <-results; want != r.err {
+               t.Errorf("want %#v got %#v", want, r)
+       }
+
+       // Remote Sender.Close() interrupts Receive()
+       snd, rcv = pairs.senderReceiver()
+       go doReceive(rcv, results)
+       snd.Close(want)
+       if r := <-results; want != r.err {
+               t.Errorf("want %#v got %#v", want, r)
+       }
+}
+
+// Test closing the server end of a connection.
+func TestConnectionCloseInterrupt1(t *testing.T) {
+       want := amqp.Error{Name: "x", Description: "bad"}
+       pairs := newPairs(t, 1, true)
+       results := make(chan result) // Collect expected errors
+
+       // Connection.Close() interrupts Send, Receive, Disposition.
+       snd, rcv := pairs.senderReceiver()
+       go doSend(snd, results)
+
+       rcv.Receive()
+       rcv, snd = pairs.receiverSender()
+       go doReceive(rcv, results)
+
+       snd, rcv = pairs.senderReceiver()
+       ack := snd.SendWaitable(amqp.NewMessage())
+       rcv.Receive()
+       go doDisposition(ack, results)
+
+       pairs.server.Close(want)
+       for i := 0; i < 3; i++ {
+               if r := <-results; want != r.err {
+                       t.Errorf("want %v got %v", want, r)
+               }
+       }
+}
+
+// Test closing the client end of the connection.
+func TestConnectionCloseInterrupt2(t *testing.T) {
+       want := amqp.Error{Name: "x", Description: "bad"}
+       pairs := newPairs(t, 1, true)
+       results := make(chan result) // Collect expected errors
+
+       // Connection.Close() interrupts Send, Receive, Disposition.
+       snd, rcv := pairs.senderReceiver()
+       go doSend(snd, results)
+       rcv.Receive()
+
+       rcv, snd = pairs.receiverSender()
+       go doReceive(rcv, results)
+
+       snd, rcv = pairs.senderReceiver()
+       ack := snd.SendWaitable(amqp.NewMessage())
+       go doDisposition(ack, results)
+
+       pairs.client.Connection().Close(want)
+       for i := 0; i < 3; i++ {
+               if r := <-results; want != r.err {
+                       t.Errorf("want %v got %v", want, r.err)
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index fc701c6..ca93e5b 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -20,6 +20,7 @@ under the License.
 package electron
 
 import (
+       "fmt"
        "io"
        "qpid.apache.org/proton"
 )
@@ -28,12 +29,13 @@ import (
 // was closed cleanly.
 var Closed = io.EOF
 
-// Endpoint is the common interface for Connection, Session, Link, Sender and 
Receiver.
+// Endpoint is the local end of a communications channel to the remote peer
+// process.  The following interface implement Endpoint: Connection, Session,
+// Sender and Receiver.
 //
-// Endpoints can be created locally or by the remote peer. You must Open() an
-// endpoint before you can use it. Some endpoints have additional Set*() 
methods
-// that must be called before Open() to take effect, see Connection, Session,
-// Link, Sender and Receiver for details.
+// You can create an endpoint with functions on Container, Connection and
+// Session. You can accept incoming endpoints from the remote peer using
+// Connection.Incoming()
 //
 type Endpoint interface {
        // Close an endpoint and signal an error to the remote end if error != 
nil.
@@ -46,35 +48,58 @@ type Endpoint interface {
        // Error() == Closed means the endpoint was closed without error.
        Error() error
 
-       // Connection containing the endpoint
+       // Connection is the connection associated with this endpoint.
        Connection() Connection
 
        // Done returns a channel that will close when the endpoint closes.
-       // Error() will contain the reason.
+       // After Done() has closed, Error() will return the reason for closing.
        Done() <-chan struct{}
 
+       // Sync() waits for the remote peer to confirm the endpoint is active or
+       // reject it with an error. You can call it immediately on new endpoints
+       // for more predictable error handling.
+       //
+       // AMQP is an asynchronous protocol. It is legal to create an endpoint 
and
+       // start using it without waiting for confirmation. This avoids a 
needless
+       // delay in the non-error case and throughput by "assuming the best".
+       //
+       // However if there *is* an error, these "optimistic" actions will 
fail. The
+       // endpoint and its children will be closed with an error. The error 
will only
+       // be detected when you try to use one of these endpoints or call Sync()
+       Sync() error
+
        // Called in handler goroutine when endpoint is remotely closed.
        closed(err error) error
+       wakeSync()
 }
 
-// DEVELOPER NOTES
-//
-// An electron.Endpoint corresponds to a proton.Endpoint, which can be 
invalidated
-//
+// Base implementation for Endpoint
 type endpoint struct {
-       err  proton.ErrorHolder
-       str  string // Must be set by the value that embeds endpoint.
-       done chan struct{}
+       err    proton.ErrorHolder
+       str    string // String() return value.
+       done   chan struct{}
+       active chan struct{}
 }
 
-func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) }
+func (e *endpoint) init(s string) {
+       e.str = s
+       e.done = make(chan struct{})
+       e.active = make(chan struct{})
+}
+
+// Called in proton goroutine on remote open.
+func (e *endpoint) wakeSync() {
+       select { // Close active channel if not already closed.
+       case <-e.active:
+       default:
+               close(e.active)
+       }
+}
 
-// Called in handler on a Closed event. Marks the endpoint as closed and the 
corresponding
-// proton.Endpoint pointer as invalid. Injected functions should check Error() 
to ensure
-// the pointer has not been invalidated.
+// Called in proton goroutine (from handler) on a Closed or Disconnected event.
 //
-// Returns the error stored on the endpoint, which may not be different to err 
if there was
-// already a n error
+// Set err if there is not already an error on the endpoint.
+// Return Error()
 func (e *endpoint) closed(err error) error {
        select {
        case <-e.done:
@@ -82,9 +107,10 @@ func (e *endpoint) closed(err error) error {
        default:
                e.err.Set(err)
                e.err.Set(Closed)
+               e.wakeSync() // Make sure we wake up Sync()
                close(e.done)
        }
-       return e.err.Get()
+       return e.Error()
 }
 
 func (e *endpoint) String() string { return e.str }
@@ -93,6 +119,11 @@ func (e *endpoint) Error() error { return e.err.Get() }
 
 func (e *endpoint) Done() <-chan struct{} { return e.done }
 
+func (e *endpoint) Sync() error {
+       <-e.active
+       return e.Error()
+}
+
 // Call in proton goroutine to initiate closing an endpoint locally
 // handler will complete the close when remote end closes.
 func localClose(ep proton.Endpoint, err error) {
@@ -100,3 +131,52 @@ func localClose(ep proton.Endpoint, err error) {
                proton.CloseError(ep, err)
        }
 }
+
+// Incoming is the interface for incoming endpoints, see Connection.Incoming()
+//
+// Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it
+// with optional error
+//
+// Implementing types are *IncomingConnection, *IncomingSession, 
*IncomingSender
+// and *IncomingReceiver. Each type provides methods to examine the incoming
+// endpoint request and set configuration options for the local endpoint
+// before calling Accept() or Reject()
+type Incoming interface {
+       // Accept and open the endpoint.
+       Accept() Endpoint
+
+       // Reject the endpoint with an error
+       Reject(error)
+
+       // wait for and call the accept function, call in proton goroutine.
+       wait() error
+       pEndpoint() proton.Endpoint
+}
+
+type incoming struct {
+       pep      proton.Endpoint
+       acceptCh chan func() error
+}
+
+func makeIncoming(e proton.Endpoint) incoming {
+       return incoming{pep: e, acceptCh: make(chan func() error)}
+}
+
+func (in *incoming) String() string   { return fmt.Sprintf("%s: %s", 
in.pep.Type(), in.pep) }
+func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return 
err } }
+
+// Call in proton goroutine, wait for and call the accept function.
+func (in *incoming) wait() error { return (<-in.acceptCh)() }
+
+func (in *incoming) pEndpoint() proton.Endpoint { return in.pep }
+
+// Called in app goroutine to send an accept function to proton and return the 
resulting endpoint.
+func (in *incoming) accept(f func() Endpoint) Endpoint {
+       done := make(chan Endpoint)
+       in.acceptCh <- func() error {
+               ep := f()
+               done <- ep
+               return nil
+       }
+       return <-done
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index eb53df3..ede7b6c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -78,28 +78,38 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                        h.linkError(e.Link(), "no sender")
                }
 
+       case proton.MConnectionOpening:
+               if e.Connection().State().LocalUninit() { // Remotely opened
+                       h.incoming(newIncomingConnection(h.connection))
+               }
+               h.connection.wakeSync()
+
        case proton.MSessionOpening:
                if e.Session().State().LocalUninit() { // Remotely opened
                        h.incoming(newIncomingSession(h, e.Session()))
                }
+               h.sessions[e.Session()].wakeSync()
 
        case proton.MSessionClosed:
                h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 
        case proton.MLinkOpening:
                l := e.Link()
-               if l.State().LocalActive() { // Already opened locally.
-                       break
-               }
-               ss := h.sessions[l.Session()]
-               if ss == nil {
-                       h.linkError(e.Link(), "no session")
-                       break
-               }
-               if l.IsReceiver() {
-                       h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
+               if ss := h.sessions[l.Session()]; ss != nil {
+                       if l.State().LocalUninit() { // Remotely opened.
+                               if l.IsReceiver() {
+                                       h.incoming(newIncomingReceiver(ss, l))
+                               } else {
+                                       h.incoming(newIncomingSender(ss, l))
+                               }
+                       }
+                       if ep, ok := h.links[l]; ok {
+                               ep.wakeSync()
+                       } else {
+                               h.linkError(l, "no link")
+                       }
                } else {
-                       h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
+                       h.linkError(l, "no session")
                }
 
        case proton.MLinkClosing:
@@ -112,27 +122,14 @@ func (h *handler) HandleMessagingEvent(t 
proton.MessagingEvent, e proton.Event)
                h.connection.err.Set(e.Connection().RemoteCondition().Error())
 
        case proton.MConnectionClosed:
-               h.connectionClosed(proton.EndpointError(e.Connection()))
+               h.shutdown(proton.EndpointError(e.Connection()))
 
        case proton.MDisconnected:
-               h.connection.err.Set(e.Transport().Condition().Error())
-               // If err not set at this point (e.g. to Closed) then this is 
unexpected.
-               h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected 
disconnect on %s", h.connection))
-
-               err := h.connection.Error()
-
-               for l, _ := range h.links {
-                       h.linkClosed(l, err)
-               }
-               h.links = nil
-               for _, s := range h.sessions {
-                       s.closed(err)
-               }
-               h.sessions = nil
-               for _, sm := range h.sentMessages {
-                       sm.ack <- Outcome{Unacknowledged, err, sm.value}
+               err := e.Transport().Condition().Error()
+               if err == nil {
+                       err = amqp.Errorf(amqp.IllegalState, "unexpected 
disconnect on %s", h.connection)
                }
-               h.sentMessages = nil
+               h.shutdown(err)
        }
 }
 
@@ -175,13 +172,26 @@ func (h *handler) sessionClosed(ps proton.Session, err 
error) {
        }
 }
 
-func (h *handler) connectionClosed(err error) {
+func (h *handler) shutdown(err error) {
        err = h.connection.closed(err)
-       // Close links first to avoid repeated scans of the link list by 
sessions.
-       for l, _ := range h.links {
-               h.linkClosed(l, err)
+       for _, sm := range h.sentMessages {
+               // Don't block but ensure outcome is sent eventually.
+               if sm.ack != nil {
+                       o := Outcome{Unacknowledged, err, sm.value}
+                       select {
+                       case sm.ack <- o:
+                       default:
+                               go func(ack chan<- Outcome) { ack <- o 
}(sm.ack) // Deliver it eventually
+                       }
+               }
+       }
+       h.sentMessages = nil
+       for _, l := range h.links {
+               l.closed(err)
        }
-       for s, _ := range h.sessions {
-               h.sessionClosed(s, err)
+       h.links = nil
+       for _, s := range h.sessions {
+               s.closed(err)
        }
+       h.sessions = nil
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 5d78a14..e0f6cb4 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -182,31 +182,19 @@ func makeLocalLink(sn *session, isSender bool, setting 
...LinkOption) (linkSetti
        return l, nil
 }
 
-type incomingLink struct {
-       incoming
-       linkSettings
-       pLink proton.Link
-       sn    *session
-}
-
-// Set up a link from an incoming proton.Link.
-func makeIncomingLink(sn *session, pLink proton.Link) incomingLink {
-       l := incomingLink{
-               incoming: makeIncoming(pLink),
-               linkSettings: linkSettings{
-                       isSender:  pLink.IsSender(),
-                       source:    pLink.RemoteSource().Address(),
-                       target:    pLink.RemoteTarget().Address(),
-                       linkName:  pLink.Name(),
-                       sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
-                       rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
-                       capacity:  1,
-                       prefetch:  false,
-                       pLink:     pLink,
-                       session:   sn,
-               },
+func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
+       return linkSettings{
+               isSender:  pLink.IsSender(),
+               source:    pLink.RemoteSource().Address(),
+               target:    pLink.RemoteTarget().Address(),
+               linkName:  pLink.Name(),
+               sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
+               rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
+               capacity:  1,
+               prefetch:  false,
+               pLink:     pLink,
+               session:   sn,
        }
-       return l
 }
 
 // Not part of Link interface but use by Sender and Receiver.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
deleted file mode 100644
index c5c351a..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package electron
-
-import (
-       "fmt"
-       "net"
-       "path"
-       "qpid.apache.org/amqp"
-       "runtime"
-       "testing"
-       "time"
-)
-
-func fatalIf(t *testing.T, err error) {
-       if err != nil {
-               _, file, line, ok := runtime.Caller(1) // annotate with 
location of caller.
-               if ok {
-                       _, file = path.Split(file)
-               }
-               t.Fatalf("(from %s:%d) %v", file, line, err)
-       }
-}
-
-// Start a server, return listening addr and channel for incoming Connections.
-func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) {
-       listener, err := net.Listen("tcp", "")
-       fatalIf(t, err)
-       addr := listener.Addr()
-       ch := make(chan Connection)
-       go func() {
-               conn, err := listener.Accept()
-               c, err := cont.Connection(conn, Server(), AllowIncoming())
-               fatalIf(t, err)
-               ch <- c
-       }()
-       return addr, ch
-}
-
-// Open a client connection and session, return the session.
-func newClient(t *testing.T, cont Container, addr net.Addr) Session {
-       conn, err := net.Dial(addr.Network(), addr.String())
-       fatalIf(t, err)
-       c, err := cont.Connection(conn)
-       fatalIf(t, err)
-       sn, err := c.Session()
-       fatalIf(t, err)
-       return sn
-}
-
-// Return client and server ends of the same connection.
-func newClientServer(t *testing.T) (client Session, server Connection) {
-       addr, ch := newServer(t, NewContainer("test-server"))
-       client = newClient(t, NewContainer("test-client"), addr)
-       return client, <-ch
-}
-
-// Close client and server
-func closeClientServer(client Session, server Connection) {
-       client.Connection().Close(nil)
-       server.Close(nil)
-}
-
-// Send a message one way with a client sender and server receiver, verify ack.
-func TestClientSendServerReceive(t *testing.T) {
-       nLinks := 3
-       nMessages := 3
-
-       rchan := make(chan Receiver, nLinks)
-       client, server := newClientServer(t)
-       go func() {
-               for in := range server.Incoming() {
-                       switch in := in.(type) {
-                       case *IncomingReceiver:
-                               in.SetCapacity(1)
-                               in.SetPrefetch(false)
-                               rchan <- in.Accept().(Receiver)
-                       default:
-                               in.Accept()
-                       }
-               }
-       }()
-
-       defer func() { closeClientServer(client, server) }()
-
-       s := make([]Sender, nLinks)
-       for i := 0; i < nLinks; i++ {
-               var err error
-               s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
-               if err != nil {
-                       t.Fatal(err)
-               }
-       }
-       r := make([]Receiver, nLinks)
-       for i := 0; i < nLinks; i++ {
-               r[i] = <-rchan
-       }
-
-       for i := 0; i < nLinks; i++ {
-               for j := 0; j < nMessages; j++ {
-                       // Client send
-                       ack := make(chan Outcome, 1)
-                       sendDone := make(chan struct{})
-                       go func() {
-                               defer close(sendDone)
-                               m := 
amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
-                               var err error
-                               s[i].SendAsync(m, ack, "testing")
-                               if err != nil {
-                                       t.Fatal(err)
-                               }
-                       }()
-
-                       // Server recieve
-                       rm, err := r[i].Receive()
-                       if err != nil {
-                               t.Fatal(err)
-                       }
-                       if want, got := interface{}(fmt.Sprintf("foobar%v-%v", 
i, j)), rm.Message.Body(); want != got {
-                               t.Errorf("%#v != %#v", want, got)
-                       }
-
-                       // Should not be acknowledged on client yet
-                       <-sendDone
-                       select {
-                       case <-ack:
-                               t.Errorf("unexpected ack")
-                       default:
-                       }
-
-                       // Server send ack
-                       if err := rm.Reject(); err != nil {
-                               t.Error(err)
-                       }
-                       // Client get ack.
-                       if a := <-ack; a.Value != "testing" || a.Error != nil 
|| a.Status != Rejected {
-                               t.Error("unexpected ack: ", a.Status, a.Error, 
a.Value)
-                       }
-               }
-       }
-}
-
-func TestClientReceiver(t *testing.T) {
-       nMessages := 3
-       client, server := newClientServer(t)
-       go func() {
-               for in := range server.Incoming() {
-                       switch in := in.(type) {
-                       case *IncomingSender:
-                               s := in.Accept().(Sender)
-                               go func() {
-                                       for i := int32(0); i < 
int32(nMessages); i++ {
-                                               out := 
s.SendSync(amqp.NewMessageWith(i))
-                                               if out.Error != nil {
-                                                       t.Error(out.Error)
-                                                       return
-                                               }
-                                       }
-                                       s.Close(nil)
-                               }()
-                       default:
-                               in.Accept()
-                       }
-               }
-       }()
-
-       r, err := client.Receiver(Source("foo"))
-       if err != nil {
-               t.Fatal(err)
-       }
-       for i := int32(0); i < int32(nMessages); i++ {
-               rm, err := r.Receive()
-               if err != nil {
-                       if err != Closed {
-                               t.Error(err)
-                       }
-                       break
-               }
-               if err := rm.Accept(); err != nil {
-                       t.Error(err)
-               }
-               if b, ok := rm.Message.Body().(int32); !ok || b != i {
-                       t.Errorf("want %v, true got %v, %v", i, b, ok)
-               }
-       }
-       server.Close(nil)
-       client.Connection().Close(nil)
-}
-
-// Test timeout versions of waiting functions.
-func TestTimeouts(t *testing.T) {
-       var err error
-       rchan := make(chan Receiver, 1)
-       client, server := newClientServer(t)
-       go func() {
-               for i := range server.Incoming() {
-                       switch i := i.(type) {
-                       case *IncomingReceiver:
-                               i.SetCapacity(1)
-                               i.SetPrefetch(false)
-                               rchan <- i.Accept().(Receiver) // Issue credit 
only on receive
-                       default:
-                               i.Accept()
-                       }
-               }
-       }()
-       defer func() { closeClientServer(client, server) }()
-
-       // Open client sender
-       snd, err := client.Sender(Target("test"))
-       if err != nil {
-               t.Fatal(err)
-       }
-       rcv := <-rchan
-
-       // Test send with timeout
-       short := time.Millisecond
-       long := time.Second
-       m := amqp.NewMessage()
-       if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No 
credit, expect timeout.
-               t.Error("want Timeout got", err)
-       }
-       if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No 
credit, expect timeout.
-               t.Error("want Timeout got", err)
-       }
-       // Test receive with timeout
-       if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, 
expect timeout.
-               t.Error("want Timeout got", err)
-       }
-       // Test receive with timeout
-       if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, 
expect timeout.
-               t.Error("want Timeout got", err)
-       }
-       // There is now a credit on the link due to receive
-       ack := make(chan Outcome)
-       snd.SendAsyncTimeout(m, ack, nil, short)
-       // Disposition should timeout
-       select {
-       case <-ack:
-               t.Errorf("want Timeout got %#v", ack)
-       case <-time.After(short):
-       }
-
-       // Receive and accept
-       rm, err := rcv.ReceiveTimeout(long)
-       if err != nil {
-               t.Fatal(err)
-       }
-       rm.Accept()
-       // Sender get ack
-       if a := <-ack; a.Status != Accepted || a.Error != nil {
-               t.Errorf("want (accepted, nil) got %#v", a)
-       }
-}
-
-// A server that returns the opposite end of each client link via channels.
-type pairs struct {
-       t        *testing.T
-       client   Session
-       server   Connection
-       rchan    chan Receiver
-       schan    chan Sender
-       capacity int
-       prefetch bool
-}
-
-func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
-       p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan 
Sender, 1)}
-       p.client, p.server = newClientServer(t)
-       go func() {
-               for i := range p.server.Incoming() {
-                       switch i := i.(type) {
-                       case *IncomingReceiver:
-                               i.SetCapacity(capacity)
-                               i.SetPrefetch(prefetch)
-                               p.rchan <- i.Accept().(Receiver)
-                       case *IncomingSender:
-                               p.schan <- i.Accept().(Sender)
-                       default:
-                               i.Accept()
-                       }
-               }
-       }()
-       return p
-}
-
-func (p *pairs) close() {
-       closeClientServer(p.client, p.server)
-}
-
-// Return a client sender and server receiver
-func (p *pairs) senderReceiver() (Sender, Receiver) {
-       snd, err := p.client.Sender()
-       fatalIf(p.t, err)
-       rcv := <-p.rchan
-       return snd, rcv
-}
-
-// Return a client receiver and server sender
-func (p *pairs) receiverSender() (Receiver, Sender) {
-       rcv, err := p.client.Receiver()
-       fatalIf(p.t, err)
-       snd := <-p.schan
-       return rcv, snd
-}
-
-type result struct {
-       label string
-       err   error
-       value interface{}
-}
-
-func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) 
}
-
-func doSend(snd Sender, results chan result) {
-       err := snd.SendSync(amqp.NewMessage()).Error
-       results <- result{"send", err, nil}
-}
-
-func doReceive(rcv Receiver, results chan result) {
-       msg, err := rcv.Receive()
-       results <- result{"receive", err, msg}
-}
-
-func doDisposition(ack <-chan Outcome, results chan result) {
-       results <- result{"disposition", (<-ack).Error, nil}
-}
-
-// Senders get credit immediately if receivers have prefetch set
-func TestSendReceivePrefetch(t *testing.T) {
-       pairs := newPairs(t, 1, true)
-       s, r := pairs.senderReceiver()
-       s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should 
not block for credit.
-       if _, err := r.Receive(); err != nil {
-               t.Error(err)
-       }
-}
-
-// Senders do not get credit till Receive() if receivers don't have prefetch
-func TestSendReceiveNoPrefetch(t *testing.T) {
-       pairs := newPairs(t, 1, false)
-       s, r := pairs.senderReceiver()
-       done := make(chan struct{}, 1)
-       go func() {
-               s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // 
Should block for credit.
-               close(done)
-       }()
-       select {
-       case <-done:
-               t.Errorf("send should be blocked on credit")
-       default:
-               if _, err := r.Receive(); err != nil {
-                       t.Error(err)
-               } else {
-                       <-done
-               } // Should be unblocked now
-       }
-}
-
-// Test that closing Links interrupts blocked link functions.
-func TestLinkCloseInterrupt(t *testing.T) {
-       want := amqp.Error{Name: "x", Description: "all bad"}
-       pairs := newPairs(t, 1, false)
-       results := make(chan result) // Collect expected errors
-
-       // Note closing the link does not interrupt Send() calls, the AMQP spec 
says
-       // that deliveries can be settled after the link is closed.
-
-       // Receiver.Close() interrupts Receive()
-       snd, rcv := pairs.senderReceiver()
-       go doReceive(rcv, results)
-       rcv.Close(want)
-       if r := <-results; want != r.err {
-               t.Errorf("want %#v got %#v", want, r)
-       }
-
-       // Remote Sender.Close() interrupts Receive()
-       snd, rcv = pairs.senderReceiver()
-       go doReceive(rcv, results)
-       snd.Close(want)
-       if r := <-results; want != r.err {
-               t.Errorf("want %#v got %#v", want, r)
-       }
-}
-
-// Test closing the server end of a connection.
-func TestConnectionCloseInterrupt1(t *testing.T) {
-       want := amqp.Error{Name: "x", Description: "bad"}
-       pairs := newPairs(t, 1, true)
-       results := make(chan result) // Collect expected errors
-
-       // Connection.Close() interrupts Send, Receive, Disposition.
-       snd, rcv := pairs.senderReceiver()
-       go doSend(snd, results)
-
-       rcv.Receive()
-       rcv, snd = pairs.receiverSender()
-       go doReceive(rcv, results)
-
-       snd, rcv = pairs.senderReceiver()
-       ack := snd.SendWaitable(amqp.NewMessage())
-       rcv.Receive()
-       go doDisposition(ack, results)
-
-       pairs.server.Close(want)
-       for i := 0; i < 3; i++ {
-               if r := <-results; want != r.err {
-                       t.Errorf("want %v got %v", want, r)
-               }
-       }
-}
-
-// Test closing the client end of the connection.
-func TestConnectionCloseInterrupt2(t *testing.T) {
-       want := amqp.Error{Name: "x", Description: "bad"}
-       pairs := newPairs(t, 1, true)
-       results := make(chan result) // Collect expected errors
-
-       // Connection.Close() interrupts Send, Receive, Disposition.
-       snd, rcv := pairs.senderReceiver()
-       go doSend(snd, results)
-       rcv.Receive()
-
-       rcv, snd = pairs.receiverSender()
-       go doReceive(rcv, results)
-
-       snd, rcv = pairs.senderReceiver()
-       ack := snd.SendWaitable(amqp.NewMessage())
-       go doDisposition(ack, results)
-
-       pairs.client.Connection().Close(want)
-       for i := 0; i < 3; i++ {
-               if r := <-results; want != r.err {
-                       t.Errorf("want %v got %v", want, r.err)
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 606e4d6..fb234e2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -213,7 +213,15 @@ func (rm *ReceivedMessage) Release() error { return 
rm.acknowledge(proton.Releas
 // IncomingReceiver is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a receiver link.
 type IncomingReceiver struct {
-       incomingLink
+       incoming
+       linkSettings
+}
+
+func newIncomingReceiver(sn *session, pLink proton.Link) *IncomingReceiver {
+       return &IncomingReceiver{
+               incoming:     makeIncoming(pLink),
+               linkSettings: makeIncomingLinkSettings(pLink, sn),
+       }
 }
 
 // SetCapacity sets the capacity of the incoming receiver, call before Accept()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go 
b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 1d0fc60..8badf35 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -259,7 +259,15 @@ type sentMessage struct {
 // IncomingSender is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a sender link.
 type IncomingSender struct {
-       incomingLink
+       incoming
+       linkSettings
+}
+
+func newIncomingSender(sn *session, pLink proton.Link) *IncomingSender {
+       return &IncomingSender{
+               incoming:     makeIncoming(pLink),
+               linkSettings: makeIncomingLinkSettings(pLink, sn),
+       }
 }
 
 // Accept accepts an incoming sender endpoint

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index bfcb41c..d9dcefd 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -127,9 +127,6 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) 
(*Engine, error) {
        if pnErr := C.pn_connection_engine_init(&eng.engine); pnErr != 0 {
                return nil, fmt.Errorf("cannot setup engine: %s", 
PnErrorCode(pnErr))
        }
-       // Unique container-id by default.
-       eng.Connection().SetContainer(UUID4().String()) // FIXME aconway 
2016-06-21:
-       eng.Connection().Open()
        return eng, nil
 }
 
@@ -143,12 +140,6 @@ func byteSlice(data unsafe.Pointer, size C.size_t) []byte {
        }
 }
 
-func (eng *Engine) buffers() ([]byte, []byte) {
-       r := C.pn_connection_engine_read_buffer(&eng.engine)
-       w := C.pn_connection_engine_write_buffer(&eng.engine)
-       return byteSlice(unsafe.Pointer(r.start), r.size), 
byteSlice(unsafe.Pointer(w.start), w.size)
-}
-
 func (eng *Engine) Connection() Connection {
        return Connection{C.pn_connection_engine_connection(&eng.engine)}
 }
@@ -158,11 +149,12 @@ func (eng *Engine) Transport() Transport {
 }
 
 func (eng *Engine) String() string {
-       return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
+       return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), 
eng.conn.RemoteAddr())
 }
 
 func (eng *Engine) Id() string {
-       return fmt.Sprintf("%p", eng)
+       // Use transport address to match default PN_TRACE_FRM=1 output.
+       return fmt.Sprintf("%p", eng.Transport().CPtr())
 }
 
 func (eng *Engine) Error() error {
@@ -213,8 +205,7 @@ func (eng *Engine) InjectWait(f func() error) error {
 //
 func (eng *Engine) Server() { eng.Transport().SetServer() }
 
-// FIXME aconway 2016-06-21: rename
-func (eng *Engine) disconnect() { // FIXME aconway 2016-06-21: disconnected
+func (eng *Engine) disconnect() {
        eng.conn.Close()
        C.pn_connection_engine_disconnected(&eng.engine)
 }
@@ -244,12 +235,35 @@ func (eng *Engine) Disconnect(err error) {
        <-eng.running
 }
 
+func (eng *Engine) dispatch() bool {
+       for {
+               if cevent := C.pn_connection_engine_dispatch(&eng.engine); 
cevent != nil {
+                       event := makeEvent(cevent, eng)
+                       for _, h := range eng.handlers {
+                               h.HandleEvent(event)
+                       }
+               } else {
+                       break
+               }
+       }
+       return !bool(C.pn_connection_engine_finished(&eng.engine))
+}
+
+func (eng *Engine) writeBuffer() []byte {
+       w := C.pn_connection_engine_write_buffer(&eng.engine)
+       return byteSlice(unsafe.Pointer(w.start), w.size)
+}
+
+func (eng *Engine) readBuffer() []byte {
+       r := C.pn_connection_engine_read_buffer(&eng.engine)
+       return byteSlice(unsafe.Pointer(r.start), r.size)
+}
+
 // Run the engine. Engine.Run() will exit when the engine is closed or
 // disconnected.  You can check for errors after exit with Engine.Error().
 //
 func (eng *Engine) Run() error {
        C.pn_connection_engine_start(&eng.engine)
-
        // Channels for read and write buffers going in and out of the 
read/write goroutines.
        // The channels are unbuffered: we want to exchange buffers in 
seuquence.
        readsIn, writesIn := make(chan []byte), make(chan []byte)
@@ -298,9 +312,18 @@ func (eng *Engine) Run() error {
                }
        }()
 
-       for !C.pn_connection_engine_finished(&eng.engine) {
-               // Enable readIn/writeIn channles only if we have a buffer.
-               readBuf, writeBuf := eng.buffers()
+       for eng.dispatch() {
+               readBuf := eng.readBuffer()
+               writeBuf := eng.writeBuffer()
+               // Note that getting the buffers can generate events (eg. SASL 
events) that
+               // might close the transport. Check if we are already finished 
before
+               // blocking for IO.
+               if !eng.dispatch() {
+                       break
+               }
+
+               // sendReads/sendWrites are nil (not sendable in select) unless 
we have a
+               // buffer to read/write
                var sendReads, sendWrites chan []byte
                if readBuf != nil {
                        sendReads = readsIn
@@ -311,6 +334,7 @@ func (eng *Engine) Run() error {
 
                // Send buffers to the read/write goroutines if we have them.
                // Get buffers from the read/write goroutines and process them
+               // Check for injected functions
                select {
 
                case sendReads <- readBuf:
@@ -318,36 +342,18 @@ func (eng *Engine) Run() error {
                case sendWrites <- writeBuf:
 
                case buf := <-readsOut:
-                       if len(buf) > 0 {
-                               C.pn_connection_engine_read_done(&eng.engine, 
C.size_t(len(buf)))
-                       } else {
-                               panic(fmt.Sprintf("read buf %v", buf))
-                       }
+                       C.pn_connection_engine_read_done(&eng.engine, 
C.size_t(len(buf)))
 
                case buf := <-writesOut:
-                       if len(buf) > 0 {
-                               C.pn_connection_engine_write_done(&eng.engine, 
C.size_t(len(buf)))
-                       } else {
-                               panic(fmt.Sprintf("write buf %v", buf))
-                       }
+                       C.pn_connection_engine_write_done(&eng.engine, 
C.size_t(len(buf)))
 
                case f, ok := <-eng.inject: // Function injected from another 
goroutine
                        if ok {
                                f()
                        }
                }
-
-               for {
-                       cevent := C.pn_connection_engine_dispatch(&eng.engine)
-                       if cevent == nil {
-                               break
-                       }
-                       event := makeEvent(cevent, eng)
-                       for _, h := range eng.handlers {
-                               h.HandleEvent(event)
-                       }
-               }
        }
+
        eng.err.Set(EndpointError(eng.Connection()))
        eng.err.Set(eng.Transport().Condition().Error())
        close(readsIn)
@@ -365,6 +371,5 @@ func (eng *Engine) Run() error {
                        C.pn_handler_free(h.pn)
                }
        }
-       // FIXME aconway 2016-06-21: consistent error handling
        return eng.err.Get()
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
index 2a96d81..0fd652c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
@@ -206,11 +206,11 @@ func (d endpointDelegator) HandleEvent(e Event) {
                }
 
        case d.remoteOpen:
+               d.delegator.mhandler.HandleMessagingEvent(d.opening, e)
                switch {
                case state.LocalActive():
                        d.delegator.mhandler.HandleMessagingEvent(d.opened, e)
                case state.LocalUninit():
-                       d.delegator.mhandler.HandleMessagingEvent(d.opening, e)
                        if d.delegator.AutoOpen {
                                endpoint.Open()
                        }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 70611d3..3303f0a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -29,6 +29,7 @@ package proton
 //#include <proton/link.h>
 //#include <proton/link.h>
 //#include <proton/object.h>
+//#include <proton/sasl.h>
 //#include <proton/session.h>
 //#include <proton/transport.h>
 //#include <stdlib.h>
@@ -292,7 +293,8 @@ func (s Session) Receiver(name string) Link {
 
 // Unique (per process) string identifier for a connection, useful for 
debugging.
 func (c Connection) String() string {
-       return fmt.Sprintf("%x", c.pn)
+       // Use the transport address to match the default transport logs from 
PN_TRACE.
+       return fmt.Sprintf("%p", c.Transport().CPtr())
 }
 
 func (c Connection) Type() string {
@@ -323,6 +325,18 @@ func (c Connection) Sessions(state State) (sessions 
[]Session) {
        return
 }
 
+// SetPassword takes []byte not string because it is impossible to erase a 
string
+// from memory reliably. Proton will not keep the password in memory longer 
than
+// needed, the caller should overwrite their copy on return.
+//
+// The password must not contain embedded nul characters, a trailing nul is 
ignored.
+func (c Connection) SetPassword(password []byte) {
+       if len(password) == 0 || password[len(password)-1] != 0 {
+               password = append(password, 0) // Proton requires a terminating 
null.
+       }
+       C.pn_connection_set_password(c.pn, 
(*C.char)(unsafe.Pointer(&password[0])))
+}
+
 func (s Session) String() string {
        return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
 }
@@ -384,3 +398,8 @@ func (t Transport) Head() unsafe.Pointer {
 func (t Transport) Push(bytes []byte) int {
        return int(C.pn_transport_push(t.pn, 
(*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(len(bytes))))
 }
+
+// Get the SASL object for the transport.
+func (t Transport) SASL() SASL {
+       return SASL{C.pn_sasl(t.pn)}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
index 183d6ec..38c76cc 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
@@ -42,6 +42,7 @@ import (
 // #include <proton/terminus.h>
 // #include <proton/connection.h>
 // #include <proton/transport.h>
+// #include <proton/sasl.h>
 import "C"
 
 type EventType int
@@ -704,12 +705,6 @@ func (c Connection) SetUser(user string) {
 
        C.pn_connection_set_user(c.pn, userC)
 }
-func (c Connection) SetPassword(password string) {
-       passwordC := C.CString(password)
-       defer C.free(unsafe.Pointer(passwordC))
-
-       C.pn_connection_set_password(c.pn, passwordC)
-}
 func (c Connection) User() string {
        return C.GoString(C.pn_connection_get_user(c.pn))
 }
@@ -873,3 +868,76 @@ func (t Transport) Tick(now time.Time) time.Time {
 func (t Transport) Connection() Connection {
        return Connection{C.pn_transport_connection(t.pn)}
 }
+
+// Wrappers for declarations in sasl.h
+
+type SASLOutcome C.pn_sasl_outcome_t
+
+const (
+       SASLNone SASLOutcome = C.PN_SASL_NONE
+       SASLOk   SASLOutcome = C.PN_SASL_OK
+       SASLAuth SASLOutcome = C.PN_SASL_AUTH
+       SASLSys  SASLOutcome = C.PN_SASL_SYS
+       SASLPerm SASLOutcome = C.PN_SASL_PERM
+       SASLTemp SASLOutcome = C.PN_SASL_TEMP
+)
+
+func (e SASLOutcome) String() string {
+       switch e {
+
+       case C.PN_SASL_NONE:
+               return "SASLNone"
+       case C.PN_SASL_OK:
+               return "SASLOk"
+       case C.PN_SASL_AUTH:
+               return "SASLAuth"
+       case C.PN_SASL_SYS:
+               return "SASLSys"
+       case C.PN_SASL_PERM:
+               return "SASLPerm"
+       case C.PN_SASL_TEMP:
+               return "SASLTemp"
+       }
+       return "unknown"
+}
+
+type SASL struct{ pn *C.pn_sasl_t }
+
+func (s SASL) IsNil() bool          { return s.pn == nil }
+func (s SASL) CPtr() unsafe.Pointer { return unsafe.Pointer(s.pn) }
+func (s SASL) Done(outcome SASLOutcome) {
+       C.pn_sasl_done(s.pn, C.pn_sasl_outcome_t(outcome))
+}
+func (s SASL) Outcome() SASLOutcome {
+       return SASLOutcome(C.pn_sasl_outcome(s.pn))
+}
+func (s SASL) User() string {
+       return C.GoString(C.pn_sasl_get_user(s.pn))
+}
+func (s SASL) Mech() string {
+       return C.GoString(C.pn_sasl_get_mech(s.pn))
+}
+func (s SASL) AllowedMechs(mechs string) {
+       mechsC := C.CString(mechs)
+       defer C.free(unsafe.Pointer(mechsC))
+
+       C.pn_sasl_allowed_mechs(s.pn, mechsC)
+}
+func (s SASL) SetAllowInsecureMechs(insecure bool) {
+       C.pn_sasl_set_allow_insecure_mechs(s.pn, C.bool(insecure))
+}
+func (s SASL) AllowInsecureMechs() bool {
+       return bool(C.pn_sasl_get_allow_insecure_mechs(s.pn))
+}
+func (s SASL) ConfigName(name string) {
+       nameC := C.CString(name)
+       defer C.free(unsafe.Pointer(nameC))
+
+       C.pn_sasl_config_name(s.pn, nameC)
+}
+func (s SASL) ConfigPath(path string) {
+       pathC := C.CString(path)
+       defer C.free(unsafe.Pointer(pathC))
+
+       C.pn_sasl_config_path(s.pn, pathC)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index ca7e4d8..cb1f479 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -594,6 +594,10 @@ void pn_connection_set_user(pn_connection_t *connection, 
const char *user)
 void pn_connection_set_password(pn_connection_t *connection, const char 
*password)
 {
     assert(connection);
+    // Make sure the previous password is erased, if there was one.
+    size_t n = pn_string_size(connection->auth_password);
+    const char* s = pn_string_get(connection->auth_password);
+    if (n > 0 && s) memset((void*)s, 0, n);
     pn_string_set(connection->auth_password, password);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to