Merge branch 'master' into go1, pick up minor fixes.

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/5e6024bb
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/5e6024bb
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/5e6024bb

Branch: refs/heads/go1
Commit: 5e6024bbff8c77f8734f99cdd808ffd34c36f719
Parents: c87499a 7486d5b
Author: Alan Conway <[email protected]>
Authored: Thu Jan 7 17:28:32 2016 -0500
Committer: Alan Conway <[email protected]>
Committed: Thu Jan 7 17:28:32 2016 -0500

----------------------------------------------------------------------
 electron/connection.go     |  12 +++-
 electron/endpoint.go       |   5 +-
 electron/handler.go        |   6 +-
 electron/link.go           |  90 +++++++++++---------------
 electron/messaging_test.go | 114 +++++++++++++++++++-------------
 electron/receiver.go       | 139 +++++++++++++++++-----------------------
 electron/sender.go         |  61 +++++++++---------
 electron/session.go        |   6 +-
 proton/engine.go           |  74 ++++++++++++---------
 proton/wrappers.go         |   2 +-
 10 files changed, 264 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index 386875d,0000000..1f8bd40
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,240 -1,0 +1,246 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
 +      "fmt"
 +      "net"
 +      "qpid.apache.org/proton"
 +      "sync"
 +      "time"
 +)
 +
 +// Connection is an AMQP connection, created by a Container.
 +type Connection interface {
 +      Endpoint
 +
 +      // Sender opens a new sender on the DefaultSession.
 +      Sender(...LinkOption) (Sender, error)
 +
 +      // Receiver opens a new Receiver on the DefaultSession().
 +      Receiver(...LinkOption) (Receiver, error)
 +
 +      // DefaultSession() returns a default session for the connection. It is 
opened
 +      // on the first call to DefaultSession and returned on subsequent calls.
 +      DefaultSession() (Session, error)
 +
 +      // Session opens a new session.
 +      Session(...SessionOption) (Session, error)
 +
 +      // Container for the connection.
 +      Container() Container
 +
 +      // Disconnect the connection abruptly with an error.
 +      Disconnect(error)
 +
 +      // Wait waits for the connection to be disconnected.
 +      Wait() error
 +
 +      // WaitTimeout is like Wait but returns Timeout if the timeout expires.
 +      WaitTimeout(time.Duration) error
 +
 +      // Incoming returns a channel for incoming endpoints opened by the 
remote end.
 +      //
 +      // To enable, pass AllowIncoming() when creating the Connection. 
Otherwise all
 +      // incoming endpoint requests are automatically rejected and Incoming()
 +      // returns nil.
 +      //
 +      // An Incoming value can be an *IncomingSession, *IncomingSender or
 +      // *IncomingReceiver.  You must call Accept() to open the endpoint or 
Reject()
 +      // to close it with an error. The specific Incoming types have 
additional
 +      // methods to configure the endpoint.
 +      //
 +      // Not receiving from Incoming() or not calling Accept/Reject will 
block the
 +      // electron event loop. Normally you would have a dedicated goroutine 
receive
 +      // from Incoming() and start new goroutines to serve each incoming 
endpoint.
 +      // The channel is closed when the Connection closes.
 +      //
 +      Incoming() <-chan Incoming
 +}
 +
 +// ConnectionOption can be passed when creating a connection to configure 
various options
 +type ConnectionOption func(*connection)
 +
 +// Server returns a ConnectionOption to put the connection in server mode.
 +//
 +// A server connection will do protocol negotiation to accept a incoming AMQP
 +// connection. Normally you would call this for a connection created by
 +// net.Listener.Accept()
 +//
 +func Server() ConnectionOption { return func(c *connection) { 
c.engine.Server() } }
 +
 +// AllowIncoming returns a ConnectionOption to enable incoming endpoint open 
requests.
 +// See Connection.Incoming()
 +func AllowIncoming() ConnectionOption {
 +      return func(c *connection) { c.incoming = make(chan Incoming) }
 +}
 +
 +type connection struct {
 +      endpoint
 +      defaultSessionOnce, closeOnce sync.Once
 +
 +      container   *container
 +      conn        net.Conn
 +      incoming    chan Incoming
 +      handler     *handler
 +      engine      *proton.Engine
 +      eConnection proton.Connection
 +
 +      defaultSession Session
 +}
 +
 +func newConnection(conn net.Conn, cont *container, setting 
...ConnectionOption) (*connection, error) {
 +      c := &connection{container: cont, conn: conn}
 +      c.handler = newHandler(c)
 +      var err error
 +      c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
 +      if err != nil {
 +              return nil, err
 +      }
 +      for _, set := range setting {
 +              set(c)
 +      }
-       c.endpoint = makeEndpoint(c.engine.String())
++      c.endpoint.init(c.engine.String())
 +      c.eConnection = c.engine.Connection()
 +      go c.run()
 +      return c, nil
 +}
 +
 +func (c *connection) run() {
 +      c.engine.Run()
 +      if c.incoming != nil {
 +              close(c.incoming)
 +      }
 +      c.closed(Closed)
 +}
 +
- func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
++func (c *connection) Close(err error) {
++      c.err.Set(err)
++      c.engine.Close(err)
++}
 +
- func (c *connection) Disconnect(err error) { c.err.Set(err); 
c.engine.Disconnect(err) }
++func (c *connection) Disconnect(err error) {
++      c.err.Set(err)
++      c.engine.Disconnect(err)
++}
 +
 +func (c *connection) Session(setting ...SessionOption) (Session, error) {
 +      var s Session
 +      err := c.engine.InjectWait(func() error {
 +              if c.Error() != nil {
 +                      return c.Error()
 +              }
 +              eSession, err := c.engine.Connection().Session()
 +              if err == nil {
 +                      eSession.Open()
 +                      if err == nil {
 +                              s = newSession(c, eSession, setting...)
 +                      }
 +              }
 +              return err
 +      })
 +      return s, err
 +}
 +
 +func (c *connection) Container() Container { return c.container }
 +
 +func (c *connection) DefaultSession() (s Session, err error) {
 +      c.defaultSessionOnce.Do(func() {
 +              c.defaultSession, err = c.Session()
 +      })
 +      if err == nil {
 +              err = c.Error()
 +      }
 +      return c.defaultSession, err
 +}
 +
 +func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Sender(setting...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
 +func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
 +      if s, err := c.DefaultSession(); err == nil {
 +              return s.Receiver(setting...)
 +      } else {
 +              return nil, err
 +      }
 +}
 +
 +func (c *connection) Connection() Connection { return c }
 +
 +func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
 +func (c *connection) WaitTimeout(timeout time.Duration) error {
 +      _, err := timedReceive(c.done, timeout)
 +      if err == Timeout {
 +              return Timeout
 +      }
 +      return c.Error()
 +}
 +
 +func (c *connection) Incoming() <-chan Incoming { return c.incoming }
 +
 +// Incoming is the interface for incoming requests to open an endpoint.
 +// Implementing types are IncomingSession, IncomingSender and 
IncomingReceiver.
 +type Incoming interface {
 +      // Accept and open the endpoint.
 +      Accept() Endpoint
 +
 +      // Reject the endpoint with an error
 +      Reject(error)
 +
 +      // wait for and call the accept function, call in proton goroutine.
 +      wait() error
 +      pEndpoint() proton.Endpoint
 +}
 +
 +type incoming struct {
 +      endpoint proton.Endpoint
 +      acceptCh chan func() error
 +}
 +
 +func makeIncoming(e proton.Endpoint) incoming {
 +      return incoming{endpoint: e, acceptCh: make(chan func() error)}
 +}
 +
 +func (in *incoming) String() string   { return fmt.Sprintf("%s: %s", 
in.endpoint.Type(), in.endpoint) }
 +func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return 
err } }
 +
 +// Call in proton goroutine, wait for and call the accept function fr
 +func (in *incoming) wait() error { return (<-in.acceptCh)() }
 +
 +func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
 +
 +// Called in app goroutine to send an accept function to proton and return 
the resulting endpoint.
 +func (in *incoming) accept(f func() Endpoint) Endpoint {
 +      done := make(chan Endpoint)
 +      in.acceptCh <- func() error {
 +              ep := f()
 +              done <- ep
 +              return nil
 +      }
 +      return <-done
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index 2b1f62d,0000000..fc701c6
mode 100644,000000..100644
--- a/electron/endpoint.go
+++ b/electron/endpoint.go
@@@ -1,99 -1,0 +1,102 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "io"
 +      "qpid.apache.org/proton"
 +)
 +
 +// Closed is an alias for io.EOF. It is returned as an error when an endpoint
 +// was closed cleanly.
 +var Closed = io.EOF
 +
 +// Endpoint is the common interface for Connection, Session, Link, Sender and 
Receiver.
 +//
 +// Endpoints can be created locally or by the remote peer. You must Open() an
 +// endpoint before you can use it. Some endpoints have additional Set*() 
methods
 +// that must be called before Open() to take effect, see Connection, Session,
 +// Link, Sender and Receiver for details.
 +//
 +type Endpoint interface {
 +      // Close an endpoint and signal an error to the remote end if error != 
nil.
 +      Close(error)
 +
 +      // String is a human readable identifier, useful for debugging and 
logging.
 +      String() string
 +
 +      // Error returns nil if the endpoint is open, otherwise returns an 
error.
 +      // Error() == Closed means the endpoint was closed without error.
 +      Error() error
 +
 +      // Connection containing the endpoint
 +      Connection() Connection
 +
 +      // Done returns a channel that will close when the endpoint closes.
 +      // Error() will contain the reason.
 +      Done() <-chan struct{}
++
++      // Called in handler goroutine when endpoint is remotely closed.
++      closed(err error) error
 +}
 +
 +// DEVELOPER NOTES
 +//
 +// An electron.Endpoint corresponds to a proton.Endpoint, which can be 
invalidated
 +//
 +type endpoint struct {
 +      err  proton.ErrorHolder
 +      str  string // Must be set by the value that embeds endpoint.
 +      done chan struct{}
 +}
 +
- func makeEndpoint(s string) endpoint { return endpoint{str: s, done: 
make(chan struct{})} }
++func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) }
 +
 +// Called in handler on a Closed event. Marks the endpoint as closed and the 
corresponding
 +// proton.Endpoint pointer as invalid. Injected functions should check 
Error() to ensure
 +// the pointer has not been invalidated.
 +//
 +// Returns the error stored on the endpoint, which may not be different to 
err if there was
 +// already a n error
 +func (e *endpoint) closed(err error) error {
 +      select {
 +      case <-e.done:
 +              // Already closed
 +      default:
 +              e.err.Set(err)
 +              e.err.Set(Closed)
 +              close(e.done)
 +      }
 +      return e.err.Get()
 +}
 +
 +func (e *endpoint) String() string { return e.str }
 +
 +func (e *endpoint) Error() error { return e.err.Get() }
 +
 +func (e *endpoint) Done() <-chan struct{} { return e.done }
 +
 +// Call in proton goroutine to initiate closing an endpoint locally
 +// handler will complete the close when remote end closes.
 +func localClose(ep proton.Endpoint, err error) {
 +      if ep.State().LocalActive() {
 +              proton.CloseError(ep, err)
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/handler.go
----------------------------------------------------------------------
diff --cc electron/handler.go
index 0237156,0000000..eb53df3
mode 100644,000000..100644
--- a/electron/handler.go
+++ b/electron/handler.go
@@@ -1,187 -1,0 +1,187 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "qpid.apache.org/amqp"
 +      "qpid.apache.org/proton"
 +)
 +
 +// NOTE: methods in this file are called only in the proton goroutine unless 
otherwise indicated.
 +
 +type handler struct {
 +      delegator    *proton.MessagingAdapter
 +      connection   *connection
-       links        map[proton.Link]Link
++      links        map[proton.Link]Endpoint
 +      sentMessages map[proton.Delivery]sentMessage
 +      sessions     map[proton.Session]*session
 +}
 +
 +func newHandler(c *connection) *handler {
 +      h := &handler{
 +              connection:   c,
-               links:        make(map[proton.Link]Link),
++              links:        make(map[proton.Link]Endpoint),
 +              sentMessages: make(map[proton.Delivery]sentMessage),
 +              sessions:     make(map[proton.Session]*session),
 +      }
 +      h.delegator = proton.NewMessagingAdapter(h)
 +      // Disable auto features of MessagingAdapter, we do these ourselves.
 +      h.delegator.Prefetch = 0
 +      h.delegator.AutoAccept = false
 +      h.delegator.AutoSettle = false
 +      h.delegator.AutoOpen = false
 +      return h
 +}
 +
 +func (h *handler) linkError(l proton.Link, msg string) {
 +      proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", 
msg, l.Type(), l))
 +}
 +
 +func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e 
proton.Event) {
 +      switch t {
 +
 +      case proton.MMessage:
 +              if r, ok := h.links[e.Link()].(*receiver); ok {
 +                      r.message(e.Delivery())
 +              } else {
 +                      h.linkError(e.Link(), "no receiver")
 +              }
 +
 +      case proton.MSettled:
 +              if sm, ok := h.sentMessages[e.Delivery()]; ok {
 +                      d := e.Delivery().Remote()
 +                      sm.ack <- Outcome{sentStatus(d.Type()), 
d.Condition().Error(), sm.value}
 +                      delete(h.sentMessages, e.Delivery())
 +              }
 +
 +      case proton.MSendable:
 +              if s, ok := h.links[e.Link()].(*sender); ok {
 +                      s.sendable()
 +              } else {
 +                      h.linkError(e.Link(), "no sender")
 +              }
 +
 +      case proton.MSessionOpening:
 +              if e.Session().State().LocalUninit() { // Remotely opened
 +                      h.incoming(newIncomingSession(h, e.Session()))
 +              }
 +
 +      case proton.MSessionClosed:
 +              h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 +
 +      case proton.MLinkOpening:
 +              l := e.Link()
 +              if l.State().LocalActive() { // Already opened locally.
 +                      break
 +              }
 +              ss := h.sessions[l.Session()]
 +              if ss == nil {
 +                      h.linkError(e.Link(), "no session")
 +                      break
 +              }
 +              if l.IsReceiver() {
 +                      h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
 +              } else {
 +                      h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
 +              }
 +
 +      case proton.MLinkClosing:
 +              e.Link().Close()
 +
 +      case proton.MLinkClosed:
 +              h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
 +
 +      case proton.MConnectionClosing:
 +              h.connection.err.Set(e.Connection().RemoteCondition().Error())
 +
 +      case proton.MConnectionClosed:
 +              h.connectionClosed(proton.EndpointError(e.Connection()))
 +
 +      case proton.MDisconnected:
 +              h.connection.err.Set(e.Transport().Condition().Error())
 +              // If err not set at this point (e.g. to Closed) then this is 
unexpected.
 +              h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected 
disconnect on %s", h.connection))
 +
 +              err := h.connection.Error()
 +
 +              for l, _ := range h.links {
 +                      h.linkClosed(l, err)
 +              }
 +              h.links = nil
 +              for _, s := range h.sessions {
 +                      s.closed(err)
 +              }
 +              h.sessions = nil
 +              for _, sm := range h.sentMessages {
 +                      sm.ack <- Outcome{Unacknowledged, err, sm.value}
 +              }
 +              h.sentMessages = nil
 +      }
 +}
 +
 +func (h *handler) incoming(in Incoming) {
 +      var err error
 +      if h.connection.incoming != nil {
 +              h.connection.incoming <- in
 +              err = in.wait()
 +      } else {
 +              err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
 +                      in.pEndpoint().Type(), in.pEndpoint().String())
 +      }
 +      if err == nil {
 +              in.pEndpoint().Open()
 +      } else {
 +              proton.CloseError(in.pEndpoint(), err)
 +      }
 +}
 +
- func (h *handler) addLink(pl proton.Link, el Link) {
++func (h *handler) addLink(pl proton.Link, el Endpoint) {
 +      h.links[pl] = el
 +}
 +
 +func (h *handler) linkClosed(l proton.Link, err error) {
 +      if link, ok := h.links[l]; ok {
 +              link.closed(err)
 +              delete(h.links, l)
 +      }
 +}
 +
 +func (h *handler) sessionClosed(ps proton.Session, err error) {
 +      if s, ok := h.sessions[ps]; ok {
 +              delete(h.sessions, ps)
 +              err = s.closed(err)
 +              for l, _ := range h.links {
 +                      if l.Session() == ps {
 +                              h.linkClosed(l, err)
 +                      }
 +              }
 +      }
 +}
 +
 +func (h *handler) connectionClosed(err error) {
 +      err = h.connection.closed(err)
 +      // Close links first to avoid repeated scans of the link list by 
sessions.
 +      for l, _ := range h.links {
 +              h.linkClosed(l, err)
 +      }
 +      for s, _ := range h.sessions {
 +              h.sessionClosed(s, err)
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/link.go
----------------------------------------------------------------------
diff --cc electron/link.go
index 91efa8e,0000000..80b4d5c
mode 100644,000000..100644
--- a/electron/link.go
+++ b/electron/link.go
@@@ -1,245 -1,0 +1,233 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "fmt"
 +      "qpid.apache.org/proton"
 +)
 +
- // Link is the common interface for AMQP links. Sender and Receiver provide
- // more methods for the sending or receiving end of a link respectively.
- type Link interface {
-       Endpoint
- 
++// Settings associated with a link
++type LinkSettings interface {
 +      // Source address that messages are coming from.
 +      Source() string
 +
 +      // Target address that messages are going to.
 +      Target() string
 +
 +      // Name is a unique name for the link among links between the same
 +      // containers in the same direction. By default generated automatically.
 +      LinkName() string
 +
 +      // IsSender is true if this is the sending end of the link.
 +      IsSender() bool
 +
 +      // IsReceiver is true if this is the receiving end of the link.
 +      IsReceiver() bool
 +
 +      // SndSettle defines when the sending end of the link settles message 
delivery.
 +      SndSettle() SndSettleMode
 +
 +      // RcvSettle defines when the sending end of the link settles message 
delivery.
 +      RcvSettle() RcvSettleMode
 +
 +      // Session containing the Link
 +      Session() Session
- 
-       // Called in event loop on closed event.
-       closed(err error)
-       // Called to open a link (local or accepted incoming link)
-       open()
 +}
 +
 +// LinkOption can be passed when creating a sender or receiver link to set 
optional configuration.
- type LinkOption func(*link)
++type LinkOption func(*linkSettings)
 +
 +// Source returns a LinkOption that sets address that messages are coming 
from.
- func Source(s string) LinkOption { return func(l *link) { l.source = s } }
++func Source(s string) LinkOption { return func(l *linkSettings) { l.source = 
s } }
 +
 +// Target returns a LinkOption that sets address that messages are going to.
- func Target(s string) LinkOption { return func(l *link) { l.target = s } }
++func Target(s string) LinkOption { return func(l *linkSettings) { l.target = 
s } }
 +
 +// LinkName returns a LinkOption that sets the link name.
- func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
++func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target 
= s } }
 +
 +// SndSettle returns a LinkOption that sets the send settle mode
- func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { 
l.sndSettle = m } }
++func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { 
l.sndSettle = m } }
 +
 +// RcvSettle returns a LinkOption that sets the send settle mode
- func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { 
l.rcvSettle = m } }
++func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { 
l.rcvSettle = m } }
 +
 +// SndSettleMode returns a LinkOption that defines when the sending end of the
 +// link settles message delivery.
 +type SndSettleMode proton.SndSettleMode
 +
 +// Capacity returns a LinkOption that sets the link capacity
- func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
++func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = 
n } }
 +
 +// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not 
relevant for a sender.
- func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
++func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch 
= p } }
 +
 +// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
 +// are sent but no acknowledgment is received, messages can be lost if there 
is
 +// a network failure. Sets SndSettleMode=SendSettled and 
RcvSettleMode=RcvFirst
 +func AtMostOnce() LinkOption {
-       return func(l *link) {
++      return func(l *linkSettings) {
 +              SndSettle(SndSettled)(l)
 +              RcvSettle(RcvFirst)(l)
 +      }
 +}
 +
 +// AtLeastOnce returns a LinkOption that requests acknowledgment for every
 +// message, acknowledgment indicates the message was definitely received. In 
the
 +// event of a failure, unacknowledged messages can be re-sent but there is a
 +// chance that the message will be received twice in this case.  Sets
 +// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
 +func AtLeastOnce() LinkOption {
-       return func(l *link) {
++      return func(l *linkSettings) {
 +              SndSettle(SndUnsettled)(l)
 +              RcvSettle(RcvFirst)(l)
 +      }
 +}
 +
 +const (
 +      // Messages are sent unsettled
 +      SndUnsettled = SndSettleMode(proton.SndUnsettled)
 +      // Messages are sent already settled
 +      SndSettled = SndSettleMode(proton.SndSettled)
 +      // Sender can send either unsettled or settled messages.
 +      SendMixed = SndSettleMode(proton.SndMixed)
 +)
 +
 +// RcvSettleMode defines when the receiving end of the link settles message 
delivery.
 +type RcvSettleMode proton.RcvSettleMode
 +
 +const (
 +      // Receiver settles first.
 +      RcvFirst = RcvSettleMode(proton.RcvFirst)
 +      // Receiver waits for sender to settle before settling.
 +      RcvSecond = RcvSettleMode(proton.RcvSecond)
 +)
 +
- type link struct {
-       endpoint
- 
-       // Link settings.
++type linkSettings struct {
 +      source    string
 +      target    string
 +      linkName  string
 +      isSender  bool
 +      sndSettle SndSettleMode
 +      rcvSettle RcvSettleMode
 +      capacity  int
 +      prefetch  bool
++      session   *session
++      eLink     proton.Link
++}
 +
-       session *session
-       eLink   proton.Link
++type link struct {
++      endpoint
++      linkSettings
 +}
 +
- func (l *link) Source() string           { return l.source }
- func (l *link) Target() string           { return l.target }
- func (l *link) LinkName() string         { return l.linkName }
- func (l *link) IsSender() bool           { return l.isSender }
- func (l *link) IsReceiver() bool         { return !l.isSender }
- func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
- func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
- func (l *link) Session() Session         { return l.session }
- func (l *link) Connection() Connection   { return l.session.Connection() }
++func (l *linkSettings) Source() string           { return l.source }
++func (l *linkSettings) Target() string           { return l.target }
++func (l *linkSettings) LinkName() string         { return l.linkName }
++func (l *linkSettings) IsSender() bool           { return l.isSender }
++func (l *linkSettings) IsReceiver() bool         { return !l.isSender }
++func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
++func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
 +
++func (l *link) Session() Session       { return l.session }
++func (l *link) Connection() Connection { return l.session.Connection() }
 +func (l *link) engine() *proton.Engine { return l.session.connection.engine }
 +func (l *link) handler() *handler      { return l.session.connection.handler }
 +
- // Set up link fields and open the proton.Link
- func localLink(sn *session, isSender bool, setting ...LinkOption) (link, 
error) {
-       l := link{
-               session:  sn,
++// Open a link and return the linkSettings.
++func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) 
(linkSettings, error) {
++      l := linkSettings{
 +              isSender: isSender,
 +              capacity: 1,
 +              prefetch: false,
++              session:  sn,
 +      }
 +      for _, set := range setting {
 +              set(&l)
 +      }
 +      if l.linkName == "" {
 +              l.linkName = l.session.connection.container.nextLinkName()
 +      }
 +      if l.IsSender() {
 +              l.eLink = l.session.eSession.Sender(l.linkName)
 +      } else {
 +              l.eLink = l.session.eSession.Receiver(l.linkName)
 +      }
 +      if l.eLink.IsNil() {
-               l.err.Set(fmt.Errorf("cannot create link %s", l))
-               return l, l.err.Get()
++              return l, fmt.Errorf("cannot create link %s", l.eLink)
 +      }
 +      l.eLink.Source().SetAddress(l.source)
 +      l.eLink.Target().SetAddress(l.target)
 +      l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
 +      l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
 +      l.eLink.Open()
-       l.endpoint = makeEndpoint(l.eLink.String())
 +      return l, nil
 +}
 +
 +type incomingLink struct {
 +      incoming
-       link
++      linkSettings
++      eLink proton.Link
++      sn    *session
 +}
 +
 +// Set up a link from an incoming proton.Link.
 +func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
 +      l := incomingLink{
 +              incoming: makeIncoming(eLink),
-               link: link{
-                       session:   sn,
++              linkSettings: linkSettings{
 +                      isSender:  eLink.IsSender(),
-                       eLink:     eLink,
 +                      source:    eLink.RemoteSource().Address(),
 +                      target:    eLink.RemoteTarget().Address(),
 +                      linkName:  eLink.Name(),
 +                      sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
 +                      rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
 +                      capacity:  1,
 +                      prefetch:  false,
-                       endpoint:  makeEndpoint(eLink.String()),
++                      eLink:     eLink,
++                      session:   sn,
 +              },
 +      }
 +      return l
 +}
 +
 +// Not part of Link interface but use by Sender and Receiver.
 +func (l *link) Credit() (credit int, err error) {
 +      err = l.engine().InjectWait(func() error {
 +              if l.Error() != nil {
 +                      return l.Error()
 +              }
 +              credit = l.eLink.Credit()
 +              return nil
 +      })
 +      return
 +}
 +
 +// Not part of Link interface but use by Sender and Receiver.
 +func (l *link) Capacity() int { return l.capacity }
 +
 +func (l *link) Close(err error) {
 +      l.engine().Inject(func() {
 +              if l.Error() == nil {
 +                      localClose(l.eLink, err)
 +              }
 +      })
 +}
- 
- func (l *link) open() {
-       l.eLink.Open()
- }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/messaging_test.go
----------------------------------------------------------------------
diff --cc electron/messaging_test.go
index 5af57e8,0000000..0de7d16
mode 100644,000000..100644
--- a/electron/messaging_test.go
+++ b/electron/messaging_test.go
@@@ -1,426 -1,0 +1,454 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "fmt"
 +      "net"
 +      "path"
 +      "qpid.apache.org/amqp"
 +      "runtime"
 +      "testing"
 +      "time"
 +)
 +
 +func fatalIf(t *testing.T, err error) {
 +      if err != nil {
 +              _, file, line, ok := runtime.Caller(1) // annotate with 
location of caller.
 +              if ok {
 +                      _, file = path.Split(file)
 +              }
 +              t.Fatalf("(from %s:%d) %v", file, line, err)
 +      }
 +}
 +
 +// Start a server, return listening addr and channel for incoming Connections.
 +func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) {
 +      listener, err := net.Listen("tcp", "")
 +      fatalIf(t, err)
 +      addr := listener.Addr()
 +      ch := make(chan Connection)
 +      go func() {
 +              conn, err := listener.Accept()
 +              c, err := cont.Connection(conn, Server(), AllowIncoming())
 +              fatalIf(t, err)
 +              ch <- c
 +      }()
 +      return addr, ch
 +}
 +
 +// Open a client connection and session, return the session.
 +func newClient(t *testing.T, cont Container, addr net.Addr) Session {
 +      conn, err := net.Dial(addr.Network(), addr.String())
 +      fatalIf(t, err)
 +      c, err := cont.Connection(conn)
 +      fatalIf(t, err)
 +      sn, err := c.Session()
 +      fatalIf(t, err)
 +      return sn
 +}
 +
 +// Return client and server ends of the same connection.
 +func newClientServer(t *testing.T) (client Session, server Connection) {
 +      addr, ch := newServer(t, NewContainer("test-server"))
 +      client = newClient(t, NewContainer("test-client"), addr)
 +      return client, <-ch
 +}
 +
 +// Close client and server
 +func closeClientServer(client Session, server Connection) {
 +      client.Connection().Close(nil)
 +      server.Close(nil)
 +}
 +
 +// Send a message one way with a client sender and server receiver, verify 
ack.
 +func TestClientSendServerReceive(t *testing.T) {
 +      nLinks := 3
 +      nMessages := 3
 +
 +      rchan := make(chan Receiver, nLinks)
 +      client, server := newClientServer(t)
 +      go func() {
 +              for in := range server.Incoming() {
 +                      switch in := in.(type) {
 +                      case *IncomingReceiver:
 +                              in.SetCapacity(1)
 +                              in.SetPrefetch(false)
 +                              rchan <- in.Accept().(Receiver)
 +                      default:
 +                              in.Accept()
 +                      }
 +              }
 +      }()
 +
 +      defer func() { closeClientServer(client, server) }()
 +
 +      s := make([]Sender, nLinks)
 +      for i := 0; i < nLinks; i++ {
 +              var err error
 +              s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
 +              if err != nil {
 +                      t.Fatal(err)
 +              }
 +      }
 +      r := make([]Receiver, nLinks)
 +      for i := 0; i < nLinks; i++ {
 +              r[i] = <-rchan
 +      }
 +
 +      for i := 0; i < nLinks; i++ {
 +              for j := 0; j < nMessages; j++ {
 +                      // Client send
 +                      ack := make(chan Outcome, 1)
 +                      sendDone := make(chan struct{})
 +                      go func() {
 +                              defer close(sendDone)
 +                              m := 
amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
 +                              var err error
 +                              s[i].SendAsync(m, ack, "testing")
 +                              if err != nil {
 +                                      t.Fatal(err)
 +                              }
 +                      }()
 +
 +                      // Server recieve
 +                      rm, err := r[i].Receive()
 +                      if err != nil {
 +                              t.Fatal(err)
 +                      }
 +                      if want, got := interface{}(fmt.Sprintf("foobar%v-%v", 
i, j)), rm.Message.Body(); want != got {
 +                              t.Errorf("%#v != %#v", want, got)
 +                      }
 +
 +                      // Should not be acknowledged on client yet
 +                      <-sendDone
 +                      select {
 +                      case <-ack:
 +                              t.Errorf("unexpected ack")
 +                      default:
 +                      }
 +
 +                      // Server send ack
 +                      if err := rm.Reject(); err != nil {
 +                              t.Error(err)
 +                      }
 +                      // Client get ack.
 +                      if a := <-ack; a.Value != "testing" || a.Error != nil 
|| a.Status != Rejected {
 +                              t.Error("unexpected ack: ", a.Status, a.Error, 
a.Value)
 +                      }
 +              }
 +      }
 +}
 +
 +func TestClientReceiver(t *testing.T) {
 +      nMessages := 3
 +      client, server := newClientServer(t)
 +      go func() {
 +              for in := range server.Incoming() {
 +                      switch in := in.(type) {
 +                      case *IncomingSender:
 +                              s := in.Accept().(Sender)
 +                              go func() {
 +                                      for i := int32(0); i < 
int32(nMessages); i++ {
 +                                              out := 
s.SendSync(amqp.NewMessageWith(i))
 +                                              if out.Error != nil {
 +                                                      t.Error(out.Error)
 +                                                      return
 +                                              }
 +                                      }
 +                                      s.Close(nil)
 +                              }()
 +                      default:
 +                              in.Accept()
 +                      }
 +              }
 +      }()
 +
 +      r, err := client.Receiver(Source("foo"))
 +      if err != nil {
 +              t.Fatal(err)
 +      }
 +      for i := int32(0); i < int32(nMessages); i++ {
 +              rm, err := r.Receive()
 +              if err != nil {
 +                      if err != Closed {
 +                              t.Error(err)
 +                      }
 +                      break
 +              }
 +              if err := rm.Accept(); err != nil {
 +                      t.Error(err)
 +              }
 +              if b, ok := rm.Message.Body().(int32); !ok || b != i {
 +                      t.Errorf("want %v, true got %v, %v", i, b, ok)
 +              }
 +      }
 +      server.Close(nil)
 +      client.Connection().Close(nil)
 +}
 +
 +// Test timeout versions of waiting functions.
 +func TestTimeouts(t *testing.T) {
 +      var err error
 +      rchan := make(chan Receiver, 1)
 +      client, server := newClientServer(t)
 +      go func() {
 +              for i := range server.Incoming() {
 +                      switch i := i.(type) {
 +                      case *IncomingReceiver:
 +                              i.SetCapacity(1)
 +                              i.SetPrefetch(false)
 +                              rchan <- i.Accept().(Receiver) // Issue credit 
only on receive
 +                      default:
 +                              i.Accept()
 +                      }
 +              }
 +      }()
 +      defer func() { closeClientServer(client, server) }()
 +
 +      // Open client sender
 +      snd, err := client.Sender(Target("test"))
 +      if err != nil {
 +              t.Fatal(err)
 +      }
 +      rcv := <-rchan
 +
 +      // Test send with timeout
 +      short := time.Millisecond
 +      long := time.Second
 +      m := amqp.NewMessage()
 +      if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No 
credit, expect timeout.
 +              t.Error("want Timeout got", err)
 +      }
 +      if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No 
credit, expect timeout.
 +              t.Error("want Timeout got", err)
 +      }
 +      // Test receive with timeout
 +      if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, 
expect timeout.
 +              t.Error("want Timeout got", err)
 +      }
 +      // Test receive with timeout
 +      if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, 
expect timeout.
 +              t.Error("want Timeout got", err)
 +      }
 +      // There is now a credit on the link due to receive
 +      ack := make(chan Outcome)
 +      snd.SendAsyncTimeout(m, ack, nil, short)
 +      // Disposition should timeout
 +      select {
 +      case <-ack:
 +              t.Errorf("want Timeout got %#v", ack)
 +      case <-time.After(short):
 +      }
 +
 +      // Receive and accept
 +      rm, err := rcv.ReceiveTimeout(long)
 +      if err != nil {
 +              t.Fatal(err)
 +      }
 +      rm.Accept()
 +      // Sender get ack
 +      if a := <-ack; a.Status != Accepted || a.Error != nil {
 +              t.Errorf("want (accepted, nil) got %#v", a)
 +      }
 +}
 +
- // clientServer that returns sender/receiver pairs at opposite ends of link.
++// A server that returns the opposite end of each client link via channels.
 +type pairs struct {
-       t      *testing.T
-       client Session
-       server Connection
-       rchan  chan Receiver
-       schan  chan Sender
++      t        *testing.T
++      client   Session
++      server   Connection
++      rchan    chan Receiver
++      schan    chan Sender
++      capacity int
++      prefetch bool
 +}
 +
- func newPairs(t *testing.T) *pairs {
++func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
 +      p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan 
Sender, 1)}
 +      p.client, p.server = newClientServer(t)
 +      go func() {
 +              for i := range p.server.Incoming() {
 +                      switch i := i.(type) {
 +                      case *IncomingReceiver:
-                               i.SetCapacity(1)
-                               i.SetPrefetch(false)
++                              i.SetCapacity(capacity)
++                              i.SetPrefetch(prefetch)
 +                              p.rchan <- i.Accept().(Receiver)
 +                      case *IncomingSender:
 +                              p.schan <- i.Accept().(Sender)
 +                      default:
 +                              i.Accept()
 +                      }
 +              }
 +      }()
 +      return p
 +}
 +
 +func (p *pairs) close() {
 +      closeClientServer(p.client, p.server)
 +}
 +
++// Return a client sender and server receiver
 +func (p *pairs) senderReceiver() (Sender, Receiver) {
 +      snd, err := p.client.Sender()
 +      fatalIf(p.t, err)
 +      rcv := <-p.rchan
 +      return snd, rcv
 +}
 +
++// Return a client receiver and server sender
 +func (p *pairs) receiverSender() (Receiver, Sender) {
 +      rcv, err := p.client.Receiver()
 +      fatalIf(p.t, err)
 +      snd := <-p.schan
 +      return rcv, snd
 +}
 +
 +type result struct {
 +      label string
 +      err   error
 +}
 +
- func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, 
r.label) }
++func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, 
r.label) }
 +
 +func doSend(snd Sender, results chan result) {
 +      err := snd.SendSync(amqp.NewMessage()).Error
 +      results <- result{"send", err}
 +}
 +
 +func doReceive(rcv Receiver, results chan result) {
 +      _, err := rcv.Receive()
 +      results <- result{"receive", err}
 +}
 +
 +func doDisposition(ack <-chan Outcome, results chan result) {
 +      results <- result{"disposition", (<-ack).Error}
 +}
 +
++// Senders get credit immediately if receivers have prefetch set
++func TestSendReceivePrefetch(t *testing.T) {
++      pairs := newPairs(t, 1, true)
++      s, r := pairs.senderReceiver()
++      s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should 
not block for credit.
++      if _, err := r.Receive(); err != nil {
++              t.Error(err)
++      }
++}
++
++// Senders do not get credit till Receive() if receivers don't have prefetch
++func TestSendReceiveNoPrefetch(t *testing.T) {
++      pairs := newPairs(t, 1, false)
++      s, r := pairs.senderReceiver()
++      done := make(chan struct{}, 1)
++      go func() {
++              s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // 
Should block for credit.
++              close(done)
++      }()
++      select {
++      case <-done:
++              t.Errorf("send should be blocked on credit")
++      default:
++              if _, err := r.Receive(); err != nil {
++                      t.Error(err)
++              } else {
++                      <-done
++              } // Should be unblocked now
++      }
++}
++
 +// Test that closing Links interrupts blocked link functions.
 +func TestLinkCloseInterrupt(t *testing.T) {
-       want := amqp.Errorf("x", "all bad")
-       pairs := newPairs(t)
++      want := amqp.Error{Name: "x", Description: "all bad"}
++      pairs := newPairs(t, 1, false)
 +      results := make(chan result) // Collect expected errors
 +
-       // Sender.Close() interrupts Send()
-       snd, rcv := pairs.senderReceiver()
-       go doSend(snd, results)
-       snd.Close(want)
-       if r := <-results; want != r.err {
-               t.Errorf("want %#v got %#v", want, r)
-       }
- 
-       // Remote Receiver.Close() interrupts Send()
-       snd, rcv = pairs.senderReceiver()
-       go doSend(snd, results)
-       rcv.Close(want)
-       if r := <-results; want != r.err {
-               t.Errorf("want %#v got %#v", want, r)
-       }
++      // Note closing the link does not interrupt Send() calls, the AMQP spec 
says
++      // that deliveries can be settled after the link is closed.
 +
 +      // Receiver.Close() interrupts Receive()
-       snd, rcv = pairs.senderReceiver()
++      snd, rcv := pairs.senderReceiver()
 +      go doReceive(rcv, results)
 +      rcv.Close(want)
 +      if r := <-results; want != r.err {
 +              t.Errorf("want %#v got %#v", want, r)
 +      }
 +
 +      // Remote Sender.Close() interrupts Receive()
 +      snd, rcv = pairs.senderReceiver()
 +      go doReceive(rcv, results)
 +      snd.Close(want)
 +      if r := <-results; want != r.err {
 +              t.Errorf("want %#v got %#v", want, r)
 +      }
 +}
 +
 +// Test closing the server end of a connection.
 +func TestConnectionCloseInterrupt1(t *testing.T) {
-       want := amqp.Errorf("x", "bad")
-       pairs := newPairs(t)
++      want := amqp.Error{Name: "x", Description: "bad"}
++      pairs := newPairs(t, 1, true)
 +      results := make(chan result) // Collect expected errors
 +
 +      // Connection.Close() interrupts Send, Receive, Disposition.
 +      snd, rcv := pairs.senderReceiver()
-       go doReceive(rcv, results)
-       ack := snd.SendWaitable(amqp.NewMessage())
-       go doDisposition(ack, results)
-       snd, rcv = pairs.senderReceiver()
 +      go doSend(snd, results)
++
++      rcv.Receive()
 +      rcv, snd = pairs.receiverSender()
 +      go doReceive(rcv, results)
++
++      snd, rcv = pairs.senderReceiver()
++      ack := snd.SendWaitable(amqp.NewMessage())
++      rcv.Receive()
++      go doDisposition(ack, results)
++
 +      pairs.server.Close(want)
 +      for i := 0; i < 3; i++ {
 +              if r := <-results; want != r.err {
-                       // TODO aconway 2015-10-06: Not propagating the correct 
error, seeing nil and EOF.
-                       t.Logf("want %v got %v", want, r.err)
++                      t.Logf("want %v got %v", want, r)
 +              }
 +      }
 +}
 +
 +// Test closing the client end of the connection.
 +func TestConnectionCloseInterrupt2(t *testing.T) {
-       want := amqp.Errorf("x", "bad")
-       pairs := newPairs(t)
++      want := amqp.Error{Name: "x", Description: "bad"}
++      pairs := newPairs(t, 1, true)
 +      results := make(chan result) // Collect expected errors
 +
 +      // Connection.Close() interrupts Send, Receive, Disposition.
 +      snd, rcv := pairs.senderReceiver()
-       go doReceive(rcv, results)
-       ack := snd.SendWaitable(amqp.NewMessage())
-       go doDisposition(ack, results)
-       snd, rcv = pairs.senderReceiver()
 +      go doSend(snd, results)
++      rcv.Receive()
++
 +      rcv, snd = pairs.receiverSender()
 +      go doReceive(rcv, results)
-       pairs.client.Close(want)
++
++      snd, rcv = pairs.senderReceiver()
++      ack := snd.SendWaitable(amqp.NewMessage())
++      go doDisposition(ack, results)
++
++      pairs.client.Connection().Close(want)
 +      for i := 0; i < 3; i++ {
 +              if r := <-results; want != r.err {
 +                      // TODO aconway 2015-10-06: Not propagating the correct 
error, seeing nil.
 +                      t.Logf("want %v got %v", want, r.err)
 +              }
 +      }
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/receiver.go
----------------------------------------------------------------------
diff --cc electron/receiver.go
index 22bdc7e,0000000..f2b7a52
mode 100644,000000..100644
--- a/electron/receiver.go
+++ b/electron/receiver.go
@@@ -1,244 -1,0 +1,225 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "fmt"
 +      "qpid.apache.org/amqp"
 +      "qpid.apache.org/proton"
 +      "time"
 +)
 +
 +// Receiver is a Link that receives messages.
 +//
 +type Receiver interface {
-       Link
++      Endpoint
++      LinkSettings
 +
 +      // Receive blocks until a message is available or until the Receiver is 
closed
 +      // and has no more buffered messages.
 +      Receive() (ReceivedMessage, error)
 +
 +      // ReceiveTimeout is like Receive but gives up after timeout, see 
Timeout.
 +      //
 +      // Note that that if Prefetch is false, after a Timeout the credit 
issued by
 +      // Receive remains on the link. It will be used by the next call to 
Receive.
 +      ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
 +
 +      // Prefetch==true means the Receiver will automatically issue credit to 
the
 +      // remote sender to keep its buffer as full as possible, i.e. it will
 +      // "pre-fetch" messages independently of the application calling
 +      // Receive(). This gives good throughput for applications that handle a
 +      // continuous stream of messages. Larger capacity may improve 
throughput, the
 +      // optimal value depends on the characteristics of your application.
 +      //
 +      // Prefetch==false means the Receiver will issue only issue credit when 
you
 +      // call Receive(), and will only issue enough credit to satisfy the 
calls
 +      // actually made. This gives lower throughput but will not fetch any 
messages
 +      // in advance. It is good for synchronous applications that need to 
evaluate
 +      // each message before deciding whether to receive another. The
 +      // request-response pattern is a typical example.  If you make 
concurrent
 +      // calls to Receive with pre-fetch disabled, you can improve 
performance by
 +      // setting the capacity close to the expected number of concurrent 
calls.
 +      //
 +      Prefetch() bool
 +
 +      // Capacity is the size (number of messages) of the local message buffer
 +      // These are messages received but not yet returned to the application 
by a call to Receive()
 +      Capacity() int
 +}
 +
- // Flow control policy for a receiver.
- type policy interface {
-       // Called at the start of Receive() to adjust credit before fetching a 
message.
-       Pre(*receiver)
-       // Called after Receive() has received a message from Buffer() before 
it returns.
-       // Non-nil error means no message was received because of an error.
-       Post(*receiver, error)
- }
- 
- type prefetchPolicy struct{}
- 
- func (p prefetchPolicy) Flow(r *receiver) {
-       r.engine().Inject(func() {
-               if r.Error() != nil {
-                       return
-               }
-               _, _, max := r.credit()
-               if max > 0 {
-                       r.eLink.Flow(max)
-               }
-       })
- }
- func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
- func (p prefetchPolicy) Post(r *receiver, err error) {
-       if err == nil {
-               p.Flow(r)
-       }
- }
- 
- type noPrefetchPolicy struct{ waiting int }
- 
- func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton 
goroutine
-       r.engine().Inject(func() {
-               if r.Error() != nil {
-                       return
-               }
-               len, credit, max := r.credit()
-               add := p.waiting - (len + credit)
-               if add > max {
-                       add = max // Don't overflow
-               }
-               if add > 0 {
-                       r.eLink.Flow(add)
-               }
-       })
- }
- func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
- func (p noPrefetchPolicy) Post(r *receiver, err error) {
-       p.waiting--
-       if err == nil {
-               p.Flow(r)
-       }
- }
- 
 +// Receiver implementation
 +type receiver struct {
 +      link
-       buffer chan ReceivedMessage
-       policy policy
++      buffer  chan ReceivedMessage
++      callers int
 +}
 +
++func (r *receiver) Capacity() int  { return cap(r.buffer) }
++func (r *receiver) Prefetch() bool { return r.prefetch }
++
 +// Call in proton goroutine
- func newReceiver(l link) *receiver {
-       r := &receiver{link: l}
++func newReceiver(ls linkSettings) *receiver {
++      r := &receiver{link: link{linkSettings: ls}}
++      r.endpoint.init(r.link.eLink.String())
 +      if r.capacity < 1 {
 +              r.capacity = 1
 +      }
-       if r.prefetch {
-               r.policy = &prefetchPolicy{}
-       } else {
-               r.policy = &noPrefetchPolicy{}
-       }
 +      r.buffer = make(chan ReceivedMessage, r.capacity)
 +      r.handler().addLink(r.eLink, r)
-       r.link.open()
++      r.link.eLink.Open()
++      if r.prefetch {
++              r.flow(r.maxFlow())
++      }
 +      return r
 +}
 +
- // call in proton goroutine.
- func (r *receiver) credit() (buffered, credit, max int) {
-       return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
++// Call in proton gorotine. Max additional credit we can request.
++func (r *receiver) maxFlow() int { return cap(r.buffer) - len(r.buffer) - 
r.eLink.Credit() }
++
++func (r *receiver) flow(credit int) {
++      if credit > 0 {
++              r.eLink.Flow(credit)
++      }
 +}
 +
- func (r *receiver) Capacity() int  { return cap(r.buffer) }
- func (r *receiver) Prefetch() bool { return r.prefetch }
++// Inject flow check per-caller call when prefetch is off.
++// Called with inc=1 at start of call, inc = -1 at end
++func (r *receiver) caller(inc int) {
++      r.engine().Inject(func() {
++              r.callers += inc
++              need := r.callers - (len(r.buffer) + r.eLink.Credit())
++              max := r.maxFlow()
++              if need > max {
++                      need = max
++              }
++              r.flow(need)
++      })
++}
 +
++// Inject flow top-up if prefetch is enabled
++func (r *receiver) flowTopUp() {
++      if r.prefetch {
++              r.engine().Inject(func() { r.flow(r.maxFlow()) })
++      }
++}
++
++// Not claled
 +func (r *receiver) Receive() (rm ReceivedMessage, err error) {
 +      return r.ReceiveTimeout(Forever)
 +}
 +
- func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, 
err error) {
++func (r *receiver) ReceiveTimeout(timeout time.Duration) (ReceivedMessage, 
error) {
 +      assert(r.buffer != nil, "Receiver is not open: %s", r)
-       r.policy.Pre(r)
-       defer func() { r.policy.Post(r, err) }()
++      select { // Check for immediate availability
++      case rm := <-r.buffer:
++              r.flowTopUp()
++              return rm, nil
++      default:
++      }
++      if !r.prefetch { // Per-caller flow control
++              r.caller(+1)
++              defer r.caller(-1)
++      }
 +      rmi, err := timedReceive(r.buffer, timeout)
 +      switch err {
-       case Timeout:
-               return ReceivedMessage{}, Timeout
++      case nil:
++              r.flowTopUp()
++              return rmi.(ReceivedMessage), err
 +      case Closed:
 +              return ReceivedMessage{}, r.Error()
 +      default:
-               return rmi.(ReceivedMessage), nil
++              return ReceivedMessage{}, err
 +      }
 +}
 +
 +// Called in proton goroutine on MMessage event.
 +func (r *receiver) message(delivery proton.Delivery) {
 +      if r.eLink.State().RemoteClosed() {
 +              localClose(r.eLink, r.eLink.RemoteCondition().Error())
 +              return
 +      }
 +      if delivery.HasMessage() {
 +              m, err := delivery.Message()
 +              if err != nil {
 +                      localClose(r.eLink, err)
 +                      return
 +              }
 +              assert(m != nil)
 +              r.eLink.Advance()
 +              if r.eLink.Credit() < 0 {
 +                      localClose(r.eLink, fmt.Errorf("received message in 
excess of credit limit"))
 +              } else {
 +                      // We never issue more credit than cap(buffer) so this 
will not block.
 +                      r.buffer <- ReceivedMessage{m, delivery, r}
 +              }
 +      }
 +}
 +
- func (r *receiver) closed(err error) {
-       r.link.closed(err)
++func (r *receiver) closed(err error) error {
 +      if r.buffer != nil {
 +              close(r.buffer)
 +      }
++      return r.link.closed(err)
 +}
 +
 +// ReceivedMessage contains an amqp.Message and allows the message to be 
acknowledged.
 +type ReceivedMessage struct {
 +      // Message is the received message.
 +      Message amqp.Message
 +
 +      eDelivery proton.Delivery
 +      receiver  Receiver
 +}
 +
 +// Acknowledge a ReceivedMessage with the given delivery status.
 +func (rm *ReceivedMessage) acknowledge(status uint64) error {
 +      return rm.receiver.(*receiver).engine().Inject(func() {
 +              // Deliveries are valid as long as the connection is, unless 
settled.
 +              rm.eDelivery.SettleAs(uint64(status))
 +      })
 +}
 +
 +// Accept tells the sender that we take responsibility for processing the 
message.
 +func (rm *ReceivedMessage) Accept() error { return 
rm.acknowledge(proton.Accepted) }
 +
 +// Reject tells the sender we consider the message invalid and unusable.
 +func (rm *ReceivedMessage) Reject() error { return 
rm.acknowledge(proton.Rejected) }
 +
 +// Release tells the sender we will not process the message but some other
 +// receiver might.
 +func (rm *ReceivedMessage) Release() error { return 
rm.acknowledge(proton.Released) }
 +
 +// IncomingReceiver is sent on the Connection.Incoming() channel when there is
 +// an incoming request to open a receiver link.
 +type IncomingReceiver struct {
 +      incomingLink
 +}
 +
 +// SetCapacity sets the capacity of the incoming receiver, call before 
Accept()
 +func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = 
capacity }
 +
 +// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before 
Accept()
 +func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = 
prefetch }
 +
 +// Accept accepts an incoming receiver endpoint
 +func (in *IncomingReceiver) Accept() Endpoint {
-       return in.accept(func() Endpoint { return newReceiver(in.link) })
++      return in.accept(func() Endpoint { return newReceiver(in.linkSettings) 
})
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/sender.go
----------------------------------------------------------------------
diff --cc electron/sender.go
index 834eb75,0000000..2f0e965
mode 100644,000000..100644
--- a/electron/sender.go
+++ b/electron/sender.go
@@@ -1,273 -1,0 +1,274 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
 +      "fmt"
 +      "qpid.apache.org/amqp"
 +      "qpid.apache.org/proton"
 +      "time"
 +)
 +
 +// Sender is a Link that sends messages.
 +//
 +// The result of sending a message is provided by an Outcome value.
 +//
 +// A sender can buffer messages up to the credit limit provided by the remote 
receiver.
 +// Send* methods will block if the buffer is full until there is space.
 +// Send*Timeout methods will give up after the timeout and set Timeout as 
Outcome.Error.
 +//
 +type Sender interface {
-       Link
++      Endpoint
++      LinkSettings
 +
 +      // SendSync sends a message and blocks until the message is 
acknowledged by the remote receiver.
 +      // Returns an Outcome, which may contain an error if the message could 
not be sent.
 +      SendSync(m amqp.Message) Outcome
 +
 +      // SendWaitable puts a message in the send buffer and returns a channel 
that
 +      // you can use to wait for the Outcome of just that message. The 
channel is
 +      // buffered so you can receive from it whenever you want without 
blocking anything.
 +      SendWaitable(m amqp.Message) <-chan Outcome
 +
 +      // SendForget buffers a message for sending and returns, with no 
notification of the outcome.
 +      SendForget(m amqp.Message)
 +
 +      // SendAsync puts a message in the send buffer and returns immediately. 
 An
 +      // Outcome with Value = value will be sent to the ack channel when the 
remote
 +      // receiver has acknowledged the message or if there is an error.
 +      //
 +      // You can use the same ack channel for many calls to SendAsync(), 
possibly on
 +      // many Senders. The channel will receive the outcomes in the order they
 +      // become available. The channel should be buffered and/or served by 
dedicated
 +      // goroutines to avoid blocking the connection.
 +      //
 +      // If ack == nil no Outcome is sent.
 +      SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
 +
 +      SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, 
timeout time.Duration)
 +
 +      SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan 
Outcome
 +
 +      SendForgetTimeout(m amqp.Message, timeout time.Duration)
 +
 +      SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
 +}
 +
 +// Outcome provides information about the outcome of sending a message.
 +type Outcome struct {
 +      // Status of the message: was it sent, how was it acknowledged.
 +      Status SentStatus
 +      // Error is a local error if Status is Unsent or Unacknowledged, a 
remote error otherwise.
 +      Error error
 +      // Value provided by the application in SendAsync()
 +      Value interface{}
 +}
 +
++func (o Outcome) send(ack chan<- Outcome) {
++      if ack != nil {
++              ack <- o
++      }
++}
++
 +// SentStatus indicates the status of a sent message.
 +type SentStatus int
 +
 +const (
 +      // Message was never sent
 +      Unsent SentStatus = iota
 +      // Message was sent but never acknowledged. It may or may not have been 
received.
 +      Unacknowledged
 +      // Message was accepted by the receiver (or was sent pre-settled, 
accept is assumed)
 +      Accepted
 +      // Message was rejected as invalid by the receiver
 +      Rejected
 +      // Message was not processed by the receiver but may be valid for a 
different receiver
 +      Released
 +      // Receiver responded with an unrecognized status.
 +      Unknown
 +)
 +
 +// String human readable name for SentStatus.
 +func (s SentStatus) String() string {
 +      switch s {
 +      case Unsent:
 +              return "unsent"
 +      case Unacknowledged:
 +              return "unacknowledged"
 +      case Accepted:
 +              return "accepted"
 +      case Rejected:
 +              return "rejected"
 +      case Released:
 +              return "released"
 +      case Unknown:
 +              return "unknown"
 +      default:
 +              return fmt.Sprintf("invalid(%d)", s)
 +      }
 +}
 +
 +// Convert proton delivery state code to SentStatus value
 +func sentStatus(d uint64) SentStatus {
 +      switch d {
 +      case proton.Accepted:
 +              return Accepted
 +      case proton.Rejected:
 +              return Rejected
 +      case proton.Released, proton.Modified:
 +              return Released
 +      default:
 +              return Unknown
 +      }
 +}
 +
 +// Sender implementation, held by handler.
 +type sender struct {
 +      link
 +      credit chan struct{} // Signal available credit.
 +}
 +
 +func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v 
interface{}, t time.Duration) {
 +      // wait for credit
 +      if _, err := timedReceive(s.credit, t); err != nil {
-               if err == Closed && s.Error != nil {
++              if err == Closed && s.Error() != nil {
 +                      err = s.Error()
 +              }
-               ack <- Outcome{Unsent, err, v}
++              Outcome{Unsent, err, v}.send(ack)
 +              return
 +      }
 +      // Send a message in handler goroutine
 +      err := s.engine().Inject(func() {
 +              if s.Error() != nil {
-                       if ack != nil {
-                               ack <- Outcome{Unsent, s.Error(), v}
-                       }
++                      Outcome{Unsent, s.Error(), v}.send(ack)
 +                      return
 +              }
-               if delivery, err := s.eLink.Send(m); err == nil {
-                       if ack != nil { // We must report an outcome
-                               if s.SndSettle() == SndSettled {
-                                       delivery.Settle() // Pre-settle if 
required
-                                       ack <- Outcome{Accepted, nil, v}
-                               } else {
-                                       s.handler().sentMessages[delivery] = 
sentMessage{ack, v}
-                               }
-                       } else { // ack == nil, can't report outcome
-                               if s.SndSettle() != SndUnsettled { // 
Pre-settle unless we are forced not to.
-                                       delivery.Settle()
-                               }
-                       }
-               } else { // err != nil
-                       if ack != nil {
-                               ack <- Outcome{Unsent, err, v}
++
++              delivery, err2 := s.eLink.Send(m)
++              switch {
++              case err2 != nil:
++                      Outcome{Unsent, err2, v}.send(ack)
++              case ack == nil || s.SndSettle() == SndSettled: // Pre-settled
++                      if s.SndSettle() != SndUnsettled { // Not forced to 
send unsettled by link policy
++                              delivery.Settle()
 +                      }
++                      Outcome{Accepted, nil, v}.send(ack) // Assume accepted
++              default:
++                      s.handler().sentMessages[delivery] = sentMessage{ack, 
v} // Register with handler
 +              }
 +              if s.eLink.Credit() > 0 { // Signal there is still credit
 +                      s.sendable()
 +              }
 +      })
-       if err != nil && ack != nil {
-               ack <- Outcome{Unsent, err, v}
++      if err != nil {
++              Outcome{Unsent, err, v}.send(ack)
 +      }
 +}
 +
 +// Set credit flag if not already set. Non-blocking, any goroutine
 +func (s *sender) sendable() {
 +      select { // Non-blocking
 +      case s.credit <- struct{}{}:
 +      default:
 +      }
 +}
 +
 +func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan 
Outcome {
 +      out := make(chan Outcome, 1)
 +      s.SendAsyncTimeout(m, out, nil, t)
 +      return out
 +}
 +
 +func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
 +      s.SendAsyncTimeout(m, nil, nil, t)
 +}
 +
 +func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
 +      deadline := time.Now().Add(t)
 +      ack := s.SendWaitableTimeout(m, t)
 +      t = deadline.Sub(time.Now()) // Adjust for time already spent.
 +      if t < 0 {
 +              t = 0
 +      }
 +      if out, err := timedReceive(ack, t); err == nil {
 +              return out.(Outcome)
 +      } else {
 +              if err == Closed && s.Error() != nil {
 +                      err = s.Error()
 +              }
 +              return Outcome{Unacknowledged, err, nil}
 +      }
 +}
 +
 +func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) 
{
 +      s.SendAsyncTimeout(m, ack, v, Forever)
 +}
 +
 +func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
 +      return s.SendWaitableTimeout(m, Forever)
 +}
 +
 +func (s *sender) SendForget(m amqp.Message) {
 +      s.SendForgetTimeout(m, Forever)
 +}
 +
 +func (s *sender) SendSync(m amqp.Message) Outcome {
 +      return <-s.SendWaitable(m)
 +}
 +
 +// handler goroutine
- func (s *sender) closed(err error) {
-       s.link.closed(err)
++func (s *sender) closed(err error) error {
 +      close(s.credit)
++      return s.link.closed(err)
 +}
 +
- func newSender(l link) *sender {
-       s := &sender{link: l, credit: make(chan struct{}, 1)}
++func newSender(ls linkSettings) *sender {
++      s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 
1)}
++      s.endpoint.init(s.link.eLink.String())
 +      s.handler().addLink(s.eLink, s)
-       s.link.open()
++      s.link.eLink.Open()
 +      return s
 +}
 +
 +// sentMessage records a sent message on the handler.
 +type sentMessage struct {
 +      ack   chan<- Outcome
 +      value interface{}
 +}
 +
 +// IncomingSender is sent on the Connection.Incoming() channel when there is
 +// an incoming request to open a sender link.
 +type IncomingSender struct {
 +      incomingLink
 +}
 +
 +// Accept accepts an incoming sender endpoint
 +func (in *IncomingSender) Accept() Endpoint {
-       return in.accept(func() Endpoint { return newSender(in.link) })
++      return in.accept(func() Endpoint { return newSender(in.linkSettings) })
 +}
 +
 +// Call in injected functions to check if the sender is valid.
 +func (s *sender) valid() bool {
 +      s2, ok := s.handler().links[s.eLink].(*sender)
 +      return ok && s2 == s
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/electron/session.go
----------------------------------------------------------------------
diff --cc electron/session.go
index 18d8bc8,0000000..1bbc52c
mode 100644,000000..100644
--- a/electron/session.go
+++ b/electron/session.go
@@@ -1,128 -1,0 +1,128 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +      "qpid.apache.org/proton"
 +)
 +
 +// Session is an AMQP session, it contains Senders and Receivers.
 +type Session interface {
 +      Endpoint
 +
 +      // Sender opens a new sender.
 +      Sender(...LinkOption) (Sender, error)
 +
 +      // Receiver opens a new Receiver.
 +      Receiver(...LinkOption) (Receiver, error)
 +}
 +
 +type session struct {
 +      endpoint
 +      eSession   proton.Session
 +      connection *connection
 +      capacity   uint
 +}
 +
 +// SessionOption can be passed when creating a Session
 +type SessionOption func(*session)
 +
 +// IncomingCapacity returns a Session Option that sets the size (in bytes) of
 +// the sessions incoming data buffer..
 +func IncomingCapacity(cap uint) SessionOption { return func(s *session) { 
s.capacity = cap } }
 +
 +// in proton goroutine
 +func newSession(c *connection, es proton.Session, setting ...SessionOption) 
*session {
 +      s := &session{
 +              connection: c,
 +              eSession:   es,
-               endpoint:   makeEndpoint(es.String()),
 +      }
++      s.endpoint.init(es.String())
 +      for _, set := range setting {
 +              set(s)
 +      }
 +      c.handler.sessions[s.eSession] = s
 +      s.eSession.SetIncomingCapacity(s.capacity)
 +      s.eSession.Open()
 +      return s
 +}
 +
 +func (s *session) Connection() Connection     { return s.connection }
 +func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
 +func (s *session) engine() *proton.Engine     { return s.connection.engine }
 +
 +func (s *session) Close(err error) {
 +      s.engine().Inject(func() {
 +              if s.Error() == nil {
 +                      localClose(s.eSession, err)
 +              }
 +      })
 +}
 +
 +func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
 +      err = s.engine().InjectWait(func() error {
 +              if s.Error() != nil {
 +                      return s.Error()
 +              }
-               l, err := localLink(s, true, setting...)
++              l, err := makeLocalLink(s, true, setting...)
 +              if err == nil {
 +                      snd = newSender(l)
 +              }
 +              return err
 +      })
 +      return
 +}
 +
 +func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
 +      err = s.engine().InjectWait(func() error {
 +              if s.Error() != nil {
 +                      return s.Error()
 +              }
-               l, err := localLink(s, false, setting...)
++              l, err := makeLocalLink(s, false, setting...)
 +              if err == nil {
 +                      rcv = newReceiver(l)
 +              }
 +              return err
 +      })
 +      return
 +}
 +
 +// IncomingSender is sent on the Connection.Incoming() channel when there is 
an
 +// incoming request to open a session.
 +type IncomingSession struct {
 +      incoming
 +      h        *handler
 +      pSession proton.Session
 +      capacity uint
 +}
 +
 +func newIncomingSession(h *handler, ps proton.Session) *IncomingSession {
 +      return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps}
 +}
 +
 +// SetCapacity sets the session buffer capacity of an incoming session in 
bytes.
 +func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes }
 +
 +// Accept an incoming session endpoint.
 +func (in *IncomingSession) Accept() Endpoint {
 +      return in.accept(func() Endpoint {
 +              return newSession(in.h.connection, in.pSession, 
IncomingCapacity(in.capacity))
 +      })
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/5e6024bb/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index 13d44b8,0000000..eecda7a
mode 100644,000000..100644
--- a/proton/engine.go
+++ b/proton/engine.go
@@@ -1,397 -1,0 +1,409 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package proton
 +
 +// #include <proton/connection.h>
 +// #include <proton/event.h>
 +// #include <proton/error.h>
 +// #include <proton/handlers.h>
 +// #include <proton/session.h>
 +// #include <proton/transport.h>
 +// #include <memory.h>
 +// #include <stdlib.h>
 +//
 +// PN_HANDLE(REMOTE_ADDR)
 +import "C"
 +
 +import (
 +      "fmt"
-       "io"
 +      "net"
 +      "sync"
++      "time"
 +      "unsafe"
 +)
 +
 +// Injecter allows functions to be "injected" into the event-processing loop, 
to
 +// be called in the same goroutine as event handlers.
 +type Injecter interface {
 +      // Inject a function into the engine goroutine.
 +      //
 +      // f() will be called in the same goroutine as event handlers, so it 
can safely
 +      // use values belonging to event handlers without synchronization. f() 
should
 +      // not block, no further events or injected functions can be processed 
until
 +      // f() returns.
 +      //
 +      // Returns a non-nil error if the function could not be injected and 
will
 +      // never be called. Otherwise the function will eventually be called.
 +      //
 +      // Note that proton values (Link, Session, Connection etc.) that 
existed when
 +      // Inject(f) was called may have become invalid by the time f() is 
executed.
 +      // Handlers should handle keep track of Closed events to ensure proton 
values
 +      // are not used after they become invalid. One technique is to have map 
from
 +      // proton values to application values. Check that the map has the 
correct
 +      // proton/application value pair at the start of the injected function 
and
 +      // delete the value from the map when handling a Closed event.
 +      Inject(f func()) error
 +
 +      // InjectWait is like Inject but does not return till f() has completed.
 +      // If f() cannot be injected it returns the error from Inject(), 
otherwise
 +      // it returns the error from f()
 +      InjectWait(f func() error) error
 +}
 +
 +// bufferChan manages a pair of ping-pong buffers to pass bytes through a 
channel.
 +type bufferChan struct {
 +      buffers    chan []byte
 +      buf1, buf2 []byte
 +}
 +
 +func newBufferChan(size int) *bufferChan {
 +      return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, 
size)}
 +}
 +
 +func (b *bufferChan) buffer() []byte {
 +      b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
 +      return b.buf1[:cap(b.buf1)]
 +}
 +
 +// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
 +// Handler functions sequentially in a single goroutine. Actions taken by
 +// Handler functions (such as sending messages) are encoded and written to the
 +// net.Conn. You can create multiple Engines to handle multiple connections
 +// concurrently.
 +//
 +// You implement the EventHandler and/or MessagingHandler interfaces and 
provide
 +// those values to NewEngine(). Their HandleEvent method will be called in the
 +// event-handling goroutine.
 +//
 +// Handlers can pass values from an event (Connections, Links, Deliveries 
etc.) to
 +// other goroutines, store them, or use them as map indexes. Effectively they 
are
 +// just pointers.  Other goroutines cannot call their methods directly but 
they can
 +// can create a function closure to call such methods and pass it to 
Engine.Inject()
 +// to have it evaluated in the engine goroutine.
 +//
 +// You are responsible for ensuring you don't use an event value after it is
 +// invalid. The handler methods will tell you when a value is no longer 
valid. For
 +// example after a LinkClosed event, that link is no longer valid. If you do
 +// Link.Close() yourself (in a handler or injected function) the link remains 
valid
 +// until the corresponing LinkClosed event is received by the handler.
 +//
 +// Engine.Close() will take care of cleaning up any remaining values when you 
are
 +// done with the Engine. All values associated with a engine become invalid 
when you
 +// call Engine.Close()
 +//
 +// The qpid.apache.org/proton/concurrent package will do all this for you, so 
it
 +// may be a better choice for some applications.
 +//
 +type Engine struct {
 +      // Error is set on exit from Run() if there was an error.
 +      err    ErrorHolder
 +      inject chan func()
 +
 +      conn       net.Conn
 +      connection Connection
 +      transport  Transport
 +      collector  *C.pn_collector_t
 +      read       *bufferChan    // Read buffers channel.
 +      write      *bufferChan    // Write buffers channel.
 +      handlers   []EventHandler // Handlers for proton events.
 +      running    chan struct{}  // This channel will be closed when the 
goroutines are done.
 +      closeOnce  sync.Once
 +}
 +
 +const bufferSize = 4096
 +
 +// NewEngine initializes a engine with a connection and handlers. To start it 
running:
 +//    eng := NewEngine(...)
 +//    go run eng.Run()
 +// The goroutine will exit when the engine is closed or disconnected.
 +// You can check for errors on Engine.Error.
 +//
 +func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
 +      // Save the connection ID for Connection.String()
 +      eng := &Engine{
 +              inject:     make(chan func()),
 +              conn:       conn,
 +              transport:  Transport{C.pn_transport()},
 +              connection: Connection{C.pn_connection()},
 +              collector:  C.pn_collector(),
 +              handlers:   handlers,
 +              read:       newBufferChan(bufferSize),
 +              write:      newBufferChan(bufferSize),
 +              running:    make(chan struct{}),
 +      }
 +      if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == 
nil {
 +              return nil, fmt.Errorf("failed to allocate engine")
 +      }
 +
 +      // TODO aconway 2015-06-25: connection settings for user, password, 
container etc.
 +      // before transport.Bind() Set up connection before Engine, allow 
Engine or Reactor
 +      // to run connection.
 +
 +      // Unique container-id by default.
 +      eng.connection.SetContainer(UUID4().String())
 +      pnErr := eng.transport.Bind(eng.connection)
 +      if pnErr != 0 {
 +              return nil, fmt.Errorf("cannot setup engine: %s", 
PnErrorCode(pnErr))
 +      }
 +      C.pn_connection_collect(eng.connection.pn, eng.collector)
 +      eng.connection.Open()
 +      return eng, nil
 +}
 +
 +func (eng *Engine) String() string {
 +      return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
 +}
 +
 +func (eng *Engine) Id() string {
-       return fmt.Sprintf("%eng", &eng)
++      return fmt.Sprintf("%p", eng)
 +}
 +
 +func (eng *Engine) Error() error {
 +      return eng.err.Get()
 +}
 +
 +// Inject a function into the Engine's event loop.
 +//
 +// f() will be called in the same event-processing goroutine that calls 
Handler
 +// methods. f() can safely call methods on values that belong to this engine
 +// (Sessions, Links etc)
 +//
 +// The injected function has no parameters or return values. It is normally a
 +// closure and can use channels to communicate with the injecting goroutine if
 +// necessary.
 +//
 +// Returns a non-nil error if the engine is closed before the function could 
be
 +// injected.
 +func (eng *Engine) Inject(f func()) error {
 +      select {
 +      case eng.inject <- f:
 +              return nil
 +      case <-eng.running:
 +              return eng.Error()
 +      }
 +}
 +
 +// InjectWait is like Inject but does not return till f() has completed or the
 +// engine is closed, and returns an error value from f()
 +func (eng *Engine) InjectWait(f func() error) error {
 +      done := make(chan error)
 +      defer close(done)
 +      err := eng.Inject(func() { done <- f() })
 +      if err != nil {
 +              return err
 +      }
 +      select {
 +      case <-eng.running:
 +              return eng.Error()
 +      case err := <-done:
 +              return err
 +      }
 +}
 +
 +// Server puts the Engine in server mode, meaning it will auto-detect 
security settings on
 +// the incoming connnection such as use of SASL and SSL.
 +// Must be called before Run()
 +//
 +func (eng *Engine) Server() { eng.transport.SetServer() }
 +
- // Close the engine's connection, returns when the engine has exited.
++func (eng *Engine) disconnect() {
++      eng.transport.CloseHead()
++      eng.transport.CloseTail()
++      eng.conn.Close()
++      eng.dispatch()
++}
++
++// Close the engine's connection.
++// If err != nil pass it to the remote end as the close condition.
++// Returns when the remote end closes or disconnects.
 +func (eng *Engine) Close(err error) {
-       eng.err.Set(err)
-       eng.Inject(func() {
-               CloseError(eng.connection, err)
-       })
++      eng.Inject(func() { CloseError(eng.connection, err) })
 +      <-eng.running
 +}
 +
- // Disconnect the engine's connection without and AMQP close, returns when 
the engine has exited.
++// CloseTimeout like Close but disconnect if the remote end doesn't close 
within timeout.
++func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
++      eng.Inject(func() { CloseError(eng.connection, err) })
++      select {
++      case <-eng.running:
++      case <-time.After(timeout):
++              eng.Disconnect(err)
++      }
++}
++
++// Disconnect the engine's connection immediately without an AMQP close.
++// Process any termination events before returning.
 +func (eng *Engine) Disconnect(err error) {
-       eng.err.Set(err)
-       eng.conn.Close()
++      eng.Inject(func() { eng.transport.Condition().SetError(err); 
eng.disconnect() })
 +      <-eng.running
 +}
 +
 +// Run the engine. Engine.Run() will exit when the engine is closed or
 +// disconnected.  You can check for errors after exit with Engine.Error().
 +//
 +func (eng *Engine) Run() error {
 +      wait := sync.WaitGroup{}
 +      wait.Add(2) // Read and write goroutines
 +
 +      readErr := make(chan error, 1) // Don't block
 +      go func() {                    // Read goroutine
 +              defer wait.Done()
 +              for {
 +                      rbuf := eng.read.buffer()
 +                      n, err := eng.conn.Read(rbuf)
 +                      if n > 0 {
 +                              eng.read.buffers <- rbuf[:n]
 +                      }
 +                      if err != nil {
 +                              readErr <- err
 +                              close(readErr)
 +                              close(eng.read.buffers)
 +                              return
 +                      }
 +              }
 +      }()
 +
 +      writeErr := make(chan error, 1) // Don't block
 +      go func() {                     // Write goroutine
 +              defer wait.Done()
 +              for {
 +                      wbuf, ok := <-eng.write.buffers
 +                      if !ok {
 +                              return
 +                      }
 +                      _, err := eng.conn.Write(wbuf)
 +                      if err != nil {
 +                              writeErr <- err
 +                              close(writeErr)
 +                              return
 +                      }
 +              }
 +      }()
 +
 +      wbuf := eng.write.buffer()[:0]
 +
-       for eng.err.Get() == nil {
++      for !eng.transport.Closed() {
 +              if len(wbuf) == 0 {
 +                      eng.pop(&wbuf)
 +              }
 +              // Don't set wchan unless there is something to write.
 +              var wchan chan []byte
 +              if len(wbuf) > 0 {
 +                      wchan = eng.write.buffers
 +              }
 +
 +              select {
 +              case buf, ok := <-eng.read.buffers: // Read a buffer
 +                      if ok {
 +                              eng.push(buf)
 +                      }
 +              case wchan <- wbuf: // Write a buffer
 +                      wbuf = eng.write.buffer()[:0]
 +              case f, ok := <-eng.inject: // Function injected from another 
goroutine
 +                      if ok {
 +                              f()
 +                      }
 +              case err := <-readErr:
-                       eng.netError(err)
++                      eng.transport.Condition().SetError(err)
++                      eng.transport.CloseTail()
 +              case err := <-writeErr:
-                       eng.netError(err)
++                      eng.transport.Condition().SetError(err)
++                      eng.transport.CloseHead()
++              }
++              eng.dispatch()
++              if eng.connection.State().RemoteClosed() && 
eng.connection.State().LocalClosed() {
++                      eng.disconnect()
 +              }
-               eng.process()
 +      }
++      eng.err.Set(EndpointError(eng.connection))
++      eng.err.Set(eng.transport.Condition().Error())
 +      close(eng.write.buffers)
 +      eng.conn.Close() // Make sure connection is closed
 +      wait.Wait()
 +      close(eng.running) // Signal goroutines have exited and Error is set.
 +
 +      if !eng.connection.IsNil() {
 +              eng.connection.Free()
 +      }
 +      if !eng.transport.IsNil() {
 +              eng.transport.Free()
 +      }
 +      if eng.collector != nil {
 +              C.pn_collector_free(eng.collector)
 +      }
 +      for _, h := range eng.handlers {
 +              switch h := h.(type) {
 +              case cHandler:
 +                      C.pn_handler_free(h.pn)
 +              }
 +      }
 +      return eng.err.Get()
 +}
 +
- func (eng *Engine) netError(err error) {
-       eng.err.Set(err)
-       eng.transport.CloseHead()
-       eng.transport.CloseTail()
- }
- 
 +func minInt(a, b int) int {
 +      if a < b {
 +              return a
 +      } else {
 +              return b
 +      }
 +}
 +
 +func (eng *Engine) pop(buf *[]byte) {
 +      pending := int(eng.transport.Pending())
 +      switch {
 +      case pending == int(C.PN_EOS):
 +              *buf = (*buf)[:]
 +              return
 +      case pending < 0:
 +              panic(fmt.Errorf("%s", PnErrorCode(pending)))
 +      }
 +      size := minInt(pending, cap(*buf))
 +      *buf = (*buf)[:size]
 +      if size == 0 {
 +              return
 +      }
 +      C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), 
C.size_t(size))
 +      assert(size > 0)
 +      eng.transport.Pop(uint(size))
 +}
 +
 +func (eng *Engine) push(buf []byte) {
 +      buf2 := buf
 +      for len(buf2) > 0 {
 +              n := eng.transport.Push(buf2)
 +              if n <= 0 {
 +                      panic(fmt.Errorf("error in transport: %s", 
PnErrorCode(n)))
 +              }
 +              buf2 = buf2[n:]
 +      }
 +}
 +
- func (eng *Engine) handle(e Event) {
-       for _, h := range eng.handlers {
-               h.HandleEvent(e)
-       }
-       if e.Type() == ETransportClosed {
-               eng.err.Set(io.EOF)
-       }
- }
++func (eng *Engine) peek() *C.pn_event_t { return 
C.pn_collector_peek(eng.collector) }
 +
- func (eng *Engine) process() {
-       for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = 
C.pn_collector_peek(eng.collector) {
-               eng.handle(makeEvent(ce, eng))
++func (eng *Engine) dispatch() {
++      for ce := eng.peek(); ce != nil; ce = eng.peek() {
++              for _, h := range eng.handlers {
++                      h.HandleEvent(makeEvent(ce, eng))
++              }
 +              C.pn_collector_pop(eng.collector)
 +      }
 +}
 +
 +func (eng *Engine) Connection() Connection { return eng.connection }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to