http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go
new file mode 100644
index 0000000..cf4b8aa
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/link.go
@@ -0,0 +1,232 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package concurrent
+
+import (
+       "qpid.apache.org/proton"
+       "qpid.apache.org/proton/internal"
+)
+
+type LinkSettings struct {
+       // Source address that messages come from.
+       Source string
+       // Target address that messages are going to.
+       Target string
+
+       // Unique (per container) name of the link.
+       // Leave blank to have the container generate a unique name 
automatically.
+       Name string
+
+       // SndSettleMode defines when the sending end of the link settles 
message delivery.
+       // Can set via AtMostOnce or AtLeastOnce.
+       SndSettleMode SndSettleMode
+
+       // RcvSettleMode defines when the receiving end of the link settles 
message delivery.
+       RcvSettleMode RcvSettleMode
+}
+
+// AtMostOnce sets "fire and forget" mode, messages are sent but no
+// acknowledgment is received, messages can be lost if there is a network
+// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
+func (s *LinkSettings) AtMostOnce() {
+       s.SndSettleMode = SndSettled
+       s.RcvSettleMode = RcvFirst
+}
+
+// AtLeastOnce requests acknowledgment for every message, acknowledgment
+// indicates the message was definitely received. In the event of a
+// failure, unacknowledged messages can be re-sent but there is a chance
+// that the message will be received twice in this case.
+// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
+func (s *LinkSettings) AtLeastOnce() {
+       s.SndSettleMode = SndUnsettled
+       s.RcvSettleMode = RcvFirst
+}
+
+// Link is the common interface for Sender and Receiver links.
+type Link interface {
+       Endpoint
+
+       // Settings for this link.
+       Settings() LinkSettings
+
+       IsSender() bool
+       IsReceiver() bool
+
+       IsOpen() bool
+
+       // Credit indicates how many messages the receiving end of the link can 
accept.
+       //
+       // A Receiver adds credit automatically when it can accept more 
messages.
+       //
+       // On a Sender credit can be negative, meaning that messages in excess 
of the
+       // receiver's credit limit have been buffered locally till credit is 
available.
+       Credit() (int, error)
+
+       // Called in event loop on closed event.
+       closed(err error)
+}
+
+// SndSettleMode defines when the sending end of the link settles message 
delivery.
+// Can set via AtMostOnce or AtLeastOnce.
+type SndSettleMode proton.SndSettleMode
+
+const (
+       // Messages are sent unsettled
+       SndUnsettled = SndSettleMode(proton.SndUnsettled)
+       // Messages are sent already settled
+       SndSettled = SndSettleMode(proton.SndSettled)
+       // Sender can send either unsettled or settled messages.
+       SendMixed = SndSettleMode(proton.SndMixed)
+)
+
+// RcvSettleMode defines when the receiving end of the link settles message 
delivery.
+type RcvSettleMode proton.RcvSettleMode
+
+const (
+       // Receiver settles first.
+       RcvFirst = RcvSettleMode(proton.RcvFirst)
+       // Receiver waits for sender to settle before settling.
+       RcvSecond = RcvSettleMode(proton.RcvSecond)
+)
+
+// Implement Link interface, for embedding in sender and receiver.
+//
+// Link creation: there are two ways to create a link.
+//
+// Session.NewSender() and Session.NewReceiver() create a "local" link which 
has
+// the session and isSender fields set. The user can set properties like Name,
+// Target and Source. On Open() the eLink is created and the properties are set
+// on the eLink.
+//
+// An "incoming" is created by the connection. with session, isSender, name,
+// source, target all set from the incoming eLink, these properties cannot be
+// changed by the user. There may be other properties (e.g. 
Receiver.SetCapacity())
+// that can be set by the user before Open().
+//
+type link struct {
+       endpoint
+
+       settings LinkSettings
+       session  *session
+       eLink    proton.Link
+       isOpen   bool
+       isSender bool
+}
+
+func (l *link) Settings() LinkSettings { return l.settings }
+func (l *link) IsSender() bool         { return l.isSender }
+func (l *link) IsReceiver() bool       { return !l.isSender }
+func (l *link) IsOpen() bool           { return l.isOpen }
+func (l *link) Session() Session       { return l.session }
+func (l *link) Connection() Connection { return l.session.Connection() }
+
+func (l *link) engine() *proton.Engine { return l.session.connection.engine }
+func (l *link) handler() *handler      { return l.session.connection.handler }
+
+// initLocal initializes a local link associated with a session.
+// Call in proton goroutine
+func makeLocalLink(sn *session, isSender bool, settings LinkSettings) (link, 
error) {
+       var l link
+       l.session = sn
+       l.settings = settings
+       l.isSender = isSender
+       if l.settings.Name == "" {
+               l.settings.Name = l.session.connection.container.nextLinkName()
+       }
+       if l.IsSender() {
+               l.eLink = l.session.eSession.Sender(l.settings.Name)
+       } else {
+               l.eLink = l.session.eSession.Receiver(l.settings.Name)
+       }
+       if l.eLink.IsNil() {
+               return l, l.setError(internal.Errorf("cannot create link %s", 
l))
+       }
+       l.setSettings()
+       return l, nil
+}
+
+// Set local end of the link to match LinkSettings.
+func (l *link) setSettings() {
+       l.eLink.Source().SetAddress(l.settings.Source)
+       l.eLink.Target().SetAddress(l.settings.Target)
+       l.eLink.SetSndSettleMode(proton.SndSettleMode(l.settings.SndSettleMode))
+       l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.settings.RcvSettleMode))
+       l.str = l.eLink.String()
+}
+
+// initIncoming sets link settings from an incoming proton.Link.
+// Call in proton goroutine
+func makeIncomingLink(sn *session, eLink proton.Link) link {
+       l := link{
+               session:  sn,
+               isSender: eLink.IsSender(),
+               eLink:    eLink,
+               settings: LinkSettings{
+                       Source:        eLink.RemoteSource().Address(),
+                       Target:        eLink.RemoteTarget().Address(),
+                       Name:          eLink.Name(),
+                       SndSettleMode: 
SndSettleMode(eLink.RemoteSndSettleMode()),
+                       RcvSettleMode: 
RcvSettleMode(eLink.RemoteRcvSettleMode()),
+               },
+       }
+       l.setSettings()
+       return l
+}
+
+func (l *link) setPanicIfOpen() {
+       if l.IsOpen() {
+               panic(internal.Errorf("link is already open %s", l))
+       }
+}
+
+// open a link, local or incoming. Call in proton goroutine
+func (l *link) open() error {
+       if l.Error() != nil {
+               return l.Error()
+       }
+       l.eLink.Open()
+       l.isOpen = true
+       return nil
+}
+
+// Called in proton goroutine
+func (l *link) closed(err error) {
+       l.setError(err)
+       if l.eLink.State().RemoteActive() {
+               if l.Error() != nil {
+                       l.eLink.Condition().SetError(l.Error())
+               }
+               l.eLink.Close()
+       }
+       l.setError(Closed) // If no error set, mark as closed.
+}
+
+func (l *link) Credit() (credit int, err error) {
+       err = l.engine().InjectWait(func() error {
+               credit = l.eLink.Credit()
+               return nil
+       })
+       return
+}
+
+func (l *link) Close(err error) {
+       l.engine().Inject(func() { localClose(l.eLink, err) })
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go
new file mode 100644
index 0000000..aa806d7
--- /dev/null
+++ 
b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go
@@ -0,0 +1,205 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package concurrent
+
+import (
+       "fmt"
+       "net"
+       "qpid.apache.org/proton/amqp"
+       "testing"
+       "time"
+)
+
+func panicIf(err error) {
+       if err != nil {
+               panic(err)
+       }
+}
+
+// Start a server, return listening addr and channel for incoming Connection.
+func newServer(cont Container) (net.Addr, <-chan Connection) {
+       listener, err := net.Listen("tcp", "")
+       panicIf(err)
+       addr := listener.Addr()
+       ch := make(chan Connection)
+       go func() {
+               conn, err := listener.Accept()
+               c, err := cont.NewConnection(conn)
+               panicIf(err)
+               c.Server()
+               c.Listen()
+               panicIf(c.Open())
+               ch <- c
+       }()
+       return addr, ch
+}
+
+// Return open an client connection and session, return the session.
+func newClient(cont Container, addr net.Addr) Session {
+       conn, err := net.Dial(addr.Network(), addr.String())
+       panicIf(err)
+       c, err := cont.NewConnection(conn)
+       panicIf(err)
+       c.Open()
+       sn, err := c.NewSession()
+       panicIf(err)
+       panicIf(sn.Open())
+       return sn
+}
+
+// Return client and server ends of the same connection.
+func newClientServer() (client Session, server Connection) {
+       addr, ch := newServer(NewContainer(""))
+       client = newClient(NewContainer(""), addr)
+       return client, <-ch
+}
+
+// Close client and server
+func closeClientServer(client Session, server Connection) {
+       client.Connection().Close(nil)
+       server.Close(nil)
+}
+
+// Send a message one way with a client sender and server receiver, verify ack.
+func TestClientSendServerReceive(t *testing.T) {
+       var err error
+       client, server := newClientServer()
+       defer func() {
+               closeClientServer(client, server)
+       }()
+
+       timeout := time.Second * 2
+       nLinks := 3
+       nMessages := 3
+
+       s := make([]Sender, nLinks)
+       for i := 0; i < nLinks; i++ {
+               s[i], err = client.Sender(fmt.Sprintf("foo%d", i))
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       // Server accept session and receivers
+       ep, err := server.Accept()
+       ep.Open() // Accept incoming session
+       r := make([]Receiver, nLinks)
+       for i := 0; i < nLinks; i++ { // Accept incoming receivers
+               ep, err = server.Accept()
+               r[i] = ep.(Receiver)
+               err = r[i].Open()
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < nLinks; i++ {
+               for j := 0; j < nMessages; j++ {
+                       // Client send
+                       m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, 
j))
+                       sm, err := s[i].Send(m)
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+
+                       // Server recieve
+                       rm, err := r[i].Receive()
+                       if err != nil {
+                               t.Fatal(err)
+                       }
+                       if want, got := interface{}(fmt.Sprintf("foobar%v-%v", 
i, j)), rm.Message.Body(); want != got {
+                               t.Errorf("%#v != %#v", want, got)
+                       }
+
+                       // Should not be acknowledged on client yet
+                       if d, err := sm.DispositionTimeout(0); err != Timeout 
|| NoDisposition != d {
+                               t.Errorf("want [no-disposition/timeout] got 
[%v/%v]", d, err)
+                       }
+                       // Server ack
+                       if err := rm.Acknowledge(Rejected); err != nil {
+                               t.Error(err)
+                       }
+                       // Client get ack.
+                       if d, err := sm.DispositionTimeout(timeout); err != nil 
|| Rejected != d {
+                               t.Errorf("want [rejected/nil] got [%v/%v]", d, 
err)
+                       }
+               }
+       }
+}
+
+func TestClientReceiver(t *testing.T) {
+       client, server := newClientServer()
+       nMessages := 3
+
+       done := make(chan struct{})
+       go func() { // Server sends
+               defer close(done)
+               for {
+                       ep, err := server.Accept()
+                       switch {
+                       case err == Closed:
+                               return
+                       case err == nil:
+                               break
+                       default:
+                               t.Error(err)
+                               return
+                       }
+                       ep.Open()
+                       if s, ok := ep.(Sender); ok {
+                               go func() {
+                                       for i := int32(0); i < 
int32(nMessages); i++ {
+                                               sm, err := 
s.Send(amqp.NewMessageWith(i))
+                                               if err != nil {
+                                                       t.Error(err)
+                                                       return
+                                               } else {
+                                                       sm.Disposition() // 
Sync send.
+                                               }
+                                       }
+                                       s.Close(nil)
+                               }()
+                       }
+               }
+       }()
+
+       r, err := client.Receiver("foo")
+       if err != nil {
+               t.Fatal(err)
+       }
+       for i := int32(0); i < int32(nMessages); i++ {
+               rm, err := r.Receive()
+               if err != nil {
+                       if err != Closed {
+                               t.Error(err)
+                       }
+                       break
+               }
+               if err := rm.Accept(); err != nil {
+                       t.Error(err)
+               }
+               if b, ok := rm.Message.Body().(int32); !ok || b != i {
+                       t.Errorf("want %v, true got %v, %v", i, b, ok)
+               }
+       }
+       server.Close(nil)
+       <-done
+       client.Connection().Close(nil)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go
new file mode 100644
index 0000000..ad033a6
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go
@@ -0,0 +1,242 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package concurrent
+
+import (
+       "qpid.apache.org/proton"
+       "qpid.apache.org/proton/amqp"
+       "qpid.apache.org/proton/internal"
+       "time"
+)
+
+type ReceiverSettings struct {
+       LinkSettings
+
+       // Capacity is the number of messages that the receiver can buffer 
locally.
+       // If unset (zero) it will be set to 1.
+       Capacity int
+
+       // Prefetch==true means the Receiver will automatically issue credit to 
the
+       // remote sender to keep its buffer as full as possible, i.e. it will
+       // "pre-fetch" messages independently of the application calling
+       // Receive(). This gives good throughput for applications that handle a
+       // continuous stream of messages. Larger capacity may improve 
throughput, the
+       // optimal value depends on the characteristics of your application.
+       //
+       // Prefetch==false means the Receiver will issue only issue credit when 
you
+       // call Receive(), and will only issue enough credit to satisfy the 
calls
+       // actually made. This gives lower throughput but will not fetch any 
messages
+       // in advance. It is good for synchronous applications that need to 
evaluate
+       // each message before deciding whether to receive another. The
+       // request-response pattern is a typical example.  If you make 
concurrent
+       // calls to Receive with pre-fetch disabled, you can improve 
performance by
+       // setting the capacity close to the expected number of concurrent 
calls.
+       //
+       Prefetch bool
+}
+
+// Receiver is a Link that receives messages.
+//
+type Receiver interface {
+       Link
+
+       // SetCapacity sets the Capacity and Prefetch (see ReceiverSettings) It 
may
+       // may called before Open() on an accepted receiver, it cannot be 
changed once
+       // the receiver is Open().
+       SetCapacity(capacity int, prefetch bool)
+
+       // Receive blocks until a message is available or until the Receiver is 
closed
+       // and has no more buffered messages.
+       Receive() (ReceivedMessage, error)
+
+       // ReceiveTimeout is like Receive but gives up after timeout, see 
Timeout.
+       ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
+}
+
+// Flow control policy for a receiver.
+type policy interface {
+       // Called at the start of Receive() to adjust credit before fetching a 
message.
+       Pre(*receiver)
+       // Called after Receive() has received a message from Buffer() before 
it returns.
+       // Non-nil error means no message was received because of an error.
+       Post(*receiver, error)
+}
+
+type prefetchPolicy struct{}
+
+func (p prefetchPolicy) Flow(r *receiver) {
+       r.engine().Inject(func() {
+               _, _, max := r.credit()
+               if max > 0 {
+                       r.eLink.Flow(max)
+               }
+       })
+}
+func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
+func (p prefetchPolicy) Post(r *receiver, err error) {
+       if err == nil {
+               p.Flow(r)
+       }
+}
+
+type noPrefetchPolicy struct{ waiting int }
+
+func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
+       r.engine().Inject(func() {
+               len, credit, max := r.credit()
+               add := p.waiting - (len + credit)
+               if add > max {
+                       add = max // Don't overflow
+               }
+               if add > 0 {
+                       r.eLink.Flow(add)
+               }
+       })
+}
+func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
+func (p noPrefetchPolicy) Post(r *receiver, err error) {
+       p.waiting--
+       if err == nil {
+               p.Flow(r)
+       }
+}
+
+// Receiver implementation
+type receiver struct {
+       link
+       // Set in Setup()
+       capacity int
+       prefetch bool
+
+       // Set in Open()
+       buffer chan ReceivedMessage
+       policy policy
+}
+
+func newReceiver(l link) Receiver { return &receiver{link: l} }
+
+func (r *receiver) SetCapacity(capacity int, prefetch bool) {
+       r.setPanicIfOpen()
+       if capacity < 1 {
+               capacity = 1
+       }
+       r.capacity = capacity
+       r.prefetch = prefetch
+}
+
+// Accept and open an incoming receiver.
+func (r *receiver) Open() error {
+       if r.capacity == 0 {
+               r.SetCapacity(1, false)
+       }
+       if r.prefetch {
+               r.policy = &prefetchPolicy{}
+       } else {
+               r.policy = &noPrefetchPolicy{}
+       }
+       err := r.engine().InjectWait(func() error {
+               err := r.open()
+               if err == nil {
+                       r.buffer = make(chan ReceivedMessage, r.capacity)
+                       r.handler().addLink(r.eLink, r)
+               }
+               return err
+       })
+       return r.setError(err)
+}
+
+// call in proton goroutine
+func (r *receiver) credit() (buffered, credit, capacity int) {
+       return len(r.buffer), r.eLink.Credit(), cap(r.buffer)
+}
+
+func (r *receiver) Capacity() int { return cap(r.buffer) }
+
+func (r *receiver) Receive() (rm ReceivedMessage, err error) {
+       return r.ReceiveTimeout(Forever)
+}
+
+func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, 
err error) {
+       internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
+       r.policy.Pre(r)
+       defer func() { r.policy.Post(r, err) }()
+       rmi, ok, timedout := timedReceive(r.buffer, timeout)
+       switch {
+       case timedout:
+               return ReceivedMessage{}, Timeout
+       case !ok:
+               return ReceivedMessage{}, r.Error()
+       default:
+               return rmi.(ReceivedMessage), nil
+       }
+}
+
+// Called in proton goroutine
+func (r *receiver) handleDelivery(delivery proton.Delivery) {
+       // FIXME aconway 2015-09-24: how can this happen if we are remote 
closed?
+       if r.eLink.State().RemoteClosed() {
+               localClose(r.eLink, r.eLink.RemoteCondition().Error())
+               return
+       }
+       if delivery.HasMessage() {
+               m, err := delivery.Message()
+               if err != nil {
+                       localClose(r.eLink, err)
+                       return
+               }
+               internal.Assert(m != nil)
+               r.eLink.Advance()
+               if r.eLink.Credit() < 0 {
+                       localClose(r.eLink, internal.Errorf("received message 
in excess of credit limit"))
+               } else {
+                       // We never issue more credit than cap(buffer) so this 
will not block.
+                       r.buffer <- ReceivedMessage{m, delivery, r}
+               }
+       }
+}
+
+func (r *receiver) closed(err error) {
+       r.closeError(err)
+       close(r.buffer)
+}
+
+// ReceivedMessage contains an amqp.Message and allows the message to be 
acknowledged.
+type ReceivedMessage struct {
+       // Message is the received message.
+       Message amqp.Message
+
+       eDelivery proton.Delivery
+       receiver  Receiver
+}
+
+// Acknowledge a ReceivedMessage with the given disposition code.
+func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
+       return rm.receiver.(*receiver).engine().InjectWait(func() error {
+               // Settle doesn't return an error but if the receiver is broken 
the settlement won't happen.
+               rm.eDelivery.SettleAs(uint64(disposition))
+               return rm.receiver.Error()
+       })
+}
+
+// Accept is short for Acknowledge(Accpeted)
+func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
+
+// Reject is short for Acknowledge(Rejected)
+func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go
new file mode 100644
index 0000000..7a65a24
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/sender.go
@@ -0,0 +1,190 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package concurrent
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+       "qpid.apache.org/proton"
+       "qpid.apache.org/proton/amqp"
+       "qpid.apache.org/proton/internal"
+       "time"
+)
+
+type SenderSettings struct {
+       LinkSettings
+}
+
+// Sender is a Link that sends messages.
+type Sender interface {
+       Link
+
+       // Send a message asynchronously, return a SentMessage to identify it.
+       //
+       // Returns nil if the link is in Unreliable mode and no acknowledgement
+       // will be received.
+       //
+       // See Credit() for note on buffering.
+       //
+       // Use SentMessage.Disposition() to wait for acknowledgement.
+       Send(m amqp.Message) (sm SentMessage, err error)
+}
+
+type sender struct{ link }
+
+func newSender(l link) Sender { return &sender{l} }
+
+// Open the Sender, must be called before calling Send().
+func (s *sender) Open() error {
+       err := s.engine().InjectWait(func() error {
+               err := s.open()
+               if err == nil {
+                       s.handler().addLink(s.eLink, s)
+               }
+               return err
+       })
+       return s.setError(err)
+}
+
+// Disposition indicates the outcome of a settled message delivery.
+type Disposition uint64
+
+const (
+       // No disposition available, not yet acknowledged or an error occurred
+       NoDisposition Disposition = 0
+       // Message was accepted by the receiver
+       Accepted = proton.Accepted
+       // Message was rejected as invalid by the receiver
+       Rejected = proton.Rejected
+       // Message was not processed by the receiver but may be processed by 
some other receiver.
+       Released = proton.Released
+)
+
+// String human readable name for a Disposition.
+func (d Disposition) String() string {
+       switch d {
+       case NoDisposition:
+               return "no-disposition"
+       case Accepted:
+               return "accepted"
+       case Rejected:
+               return "rejected"
+       case Released:
+               return "released"
+       default:
+               return "unknown"
+       }
+}
+
+func (s *sender) Send(m amqp.Message) (SentMessage, error) {
+       internal.Assert(s.IsOpen(), "sender is not open: %s", s)
+       if err := s.Error(); err != nil {
+               return nil, err
+       }
+       var sm SentMessage
+       err := s.engine().InjectWait(func() error {
+               eDelivery, err := s.eLink.Send(m)
+               if err == nil {
+                       if s.eLink.SndSettleMode() == proton.SndSettled {
+                               eDelivery.Settle()
+                       } else {
+                               sm = newSentMessage(s.session.connection, 
eDelivery)
+                               
s.session.connection.handler.sentMessages[eDelivery] = sm.(*sentMessage)
+                       }
+               }
+               return err
+       })
+       return sm, err
+}
+
+func (s *sender) closed(err error) {
+       s.closeError(err)
+}
+
+// SentMessage represents a previously sent message. It allows you to wait for 
acknowledgement.
+type SentMessage interface {
+       // Disposition blocks till the message is acknowledged and returns the
+       // disposition state.  NoDisposition means the Connection or the 
SentMessage
+       // was closed before the message was acknowledged.
+       Disposition() (Disposition, error)
+
+       // DispositionTimeout is like Disposition but gives up after timeout, 
see Timeout.
+       DispositionTimeout(time.Duration) (Disposition, error)
+
+       // Forget interrupts any call to Disposition on this SentMessage and 
tells the
+       // peer we are no longer interested in the disposition of this message.
+       Forget()
+
+       // Error returns the error that closed the disposition, or nil if there 
was no error.
+       // If the disposition closed because the connection closed, it will 
return Closed.
+       Error() error
+}
+
+type sentMessage struct {
+       connection  *connection
+       eDelivery   proton.Delivery
+       done        chan struct{}
+       disposition Disposition
+       err         error
+}
+
+func newSentMessage(c *connection, d proton.Delivery) *sentMessage {
+       return &sentMessage{c, d, make(chan struct{}), NoDisposition, nil}
+}
+
+func (sm *sentMessage) Disposition() (Disposition, error) {
+       <-sm.done
+       return sm.disposition, sm.err
+}
+
+func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, 
error) {
+       if _, _, timedout := timedReceive(sm.done, timeout); timedout {
+               return sm.disposition, Timeout
+       } else {
+               return sm.disposition, sm.err
+       }
+}
+
+func (sm *sentMessage) Forget() {
+       sm.connection.engine.Inject(func() {
+               sm.eDelivery.Settle()
+               delete(sm.connection.handler.sentMessages, sm.eDelivery)
+       })
+       sm.finish()
+}
+
+func (sm *sentMessage) settled(err error) {
+       if sm.eDelivery.Settled() {
+               sm.disposition = Disposition(sm.eDelivery.Remote().Type())
+       }
+       sm.err = err
+       sm.finish()
+}
+
+func (sm *sentMessage) finish() {
+       select {
+       case <-sm.done: // No-op if already closed
+       default:
+               close(sm.done)
+       }
+}
+
+func (sm *sentMessage) Error() error { return sm.err }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go
new file mode 100644
index 0000000..ba09690
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go
@@ -0,0 +1,115 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package concurrent
+
+import (
+       "qpid.apache.org/proton"
+       "qpid.apache.org/proton/internal"
+)
+
+// Session is an AMQP session, it contains Senders and Receivers.
+//
+type Session interface {
+       Endpoint
+       Connection() Connection
+
+       // Sender opens a new sender. v can be a string, which is used as the 
Target
+       // address, or a SenderSettings struct containing more details settings.
+       Sender(v interface{}) (Sender, error)
+
+       // Receiver opens a new Receiver. v can be a string, which is used as 
the
+       // Source address, or a ReceiverSettings struct containing more details
+       // settings.
+       Receiver(v interface{}) (Receiver, error)
+}
+
+type session struct {
+       endpoint
+       eSession   proton.Session
+       connection *connection
+}
+
+// in proton goroutine
+func newSession(c *connection, es proton.Session) *session {
+       return &session{
+               connection: c,
+               eSession:   es,
+               endpoint:   endpoint{str: es.String()},
+       }
+}
+
+func (s *session) Connection() Connection     { return s.connection }
+func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
+func (s *session) engine() *proton.Engine     { return s.connection.engine }
+func (s *session) Open() error                { 
s.engine().Inject(s.eSession.Open); return nil }
+func (s *session) Close(err error) {
+       s.engine().Inject(func() { localClose(s.eSession, err) })
+}
+
+// NewSender create a link sending to target.
+// You must call snd.Open() before calling snd.Send().
+func (s *session) Sender(v interface{}) (snd Sender, err error) {
+       var settings LinkSettings
+       switch v := v.(type) {
+       case string:
+               settings.Target = v
+       case SenderSettings:
+               settings = v.LinkSettings
+       default:
+               internal.Assert(false, "NewSender() want string or 
SenderSettings, got %T", v)
+       }
+       err = s.engine().InjectWait(func() error {
+               l, err := makeLocalLink(s, true, settings)
+               snd = newSender(l)
+               return err
+       })
+       if err == nil {
+               err = snd.Open()
+       }
+       return
+}
+
+// Receiver opens a receiving link.
+func (s *session) Receiver(v interface{}) (rcv Receiver, err error) {
+       var settings ReceiverSettings
+       switch v := v.(type) {
+       case string:
+               settings.Source = v
+       case ReceiverSettings:
+               settings = v
+       default:
+               internal.Assert(false, "NewReceiver() want string or 
ReceiverSettings, got %T", v)
+       }
+       err = s.engine().InjectWait(func() error {
+               l, err := makeLocalLink(s, false, settings.LinkSettings)
+               rcv = newReceiver(l)
+               return err
+       })
+       rcv.SetCapacity(settings.Capacity, settings.Prefetch)
+       if err == nil {
+               err = rcv.Open()
+       }
+       return
+}
+
+// Called from handler on closed.
+func (s *session) closed(err error) {
+       s.closeError(err)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go
new file mode 100644
index 0000000..e9093d3
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/time.go
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package concurrent
+
+import (
+       "qpid.apache.org/proton/internal"
+       "reflect"
+       "time"
+)
+
+// Timeout is the error returned if an operation does not complete on time.
+//
+// Methods named *Timeout in this package take time.Duration timeout parameter.
+//
+// If timeout > 0 and there is no result available before the timeout, they
+// return a zero or nil value and Timeout as an error.
+//
+// If timeout == 0 they will return a result if one is immediatley available or
+// nil/zero and Timeout as an error if not.
+//
+// If timeout == Forever the function will return only when there is a result 
or
+// some non-timeout error occurs.
+//
+var Timeout = internal.Errorf("timeout")
+
+// Forever can be used as a timeout parameter to indicate wait forever.
+const Forever time.Duration = -1
+
+// timedReceive receives on channel (which can be a chan of any type), waiting
+// up to timeout.
+//
+// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
+// forever. Other values mean block up to the timeout.
+//
+func timedReceive(channel interface{}, timeout time.Duration) (value 
interface{}, ok bool, timedout bool) {
+       cases := []reflect.SelectCase{
+               reflect.SelectCase{Dir: reflect.SelectRecv, Chan: 
reflect.ValueOf(channel)},
+       }
+       switch {
+       case timeout == 0: // Non-blocking receive
+               cases = append(cases, reflect.SelectCase{Dir: 
reflect.SelectDefault})
+       case timeout < 0: // Block forever, nothing to add
+       default: // Block up to timeout
+               cases = append(cases,
+                       reflect.SelectCase{Dir: reflect.SelectRecv, Chan: 
reflect.ValueOf(time.After(timeout))})
+       }
+       chosen, recv, recvOk := reflect.Select(cases)
+       switch {
+       case chosen == 0:
+               return recv.Interface(), recvOk, false
+       default:
+               return nil, false, true
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
new file mode 100644
index 0000000..b175cf6
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package proton provides a Go binding for the Qpid proton AMQP library.
+AMQP is an open standard for inter-operable message exchange, see 
<http://www.amqp.org/>
+
+Proton is an event-driven, concurrent-unsafe AMQP protocol library that allows
+you to send and receive messages using the standard AMQP concurrent protocol.
+
+For most tasks, consider using package `qpid.apache.org/proton/concurrent`.  It
+provides a concurrent-safe API that is easier and more natural to use in Go.
+
+The raw proton API is event-driven and not concurrent-safe. You implement a
+MessagingHandler event handler to react to AMQP protocol events. You must 
ensure
+that all events are handled in a single goroutine or that you serialize all all
+uses of the proton objects associated with a single connection using a lock.
+You can use channels to communicate between application goroutines and the
+event-handling goroutine, see type Event fro more detail.
+
+Package `qpid.apache.org/proton/concurrent` does all this for you and presents
+a simple concurrent-safe interface.
+
+*/
+package proton
+
+// #cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// This file is just for the package comment.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
new file mode 100644
index 0000000..3096280
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -0,0 +1,402 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package proton
+
+// #include <proton/connection.h>
+// #include <proton/event.h>
+// #include <proton/handlers.h>
+// #include <proton/session.h>
+// #include <proton/transport.h>
+// #include <memory.h>
+// #include <stdlib.h>
+//
+// PN_HANDLE(REMOTE_ADDR)
+import "C"
+
+import (
+       "fmt"
+       "io"
+       "net"
+       "qpid.apache.org/proton/internal"
+       "sync"
+       "unsafe"
+)
+
+// Injecter allows functions to be "injected" into an event-processing loop.
+type Injecter interface {
+       // Inject a function into an event-loop concurrency context.
+       //
+       // f() will be called in the same concurrency context as event handers, 
so it
+       // can safely use values that can used be used in that context. If f 
blocks it
+       // will block the event loop so be careful calling blocking functions 
in f.
+       //
+       // Returns a non-nil error if the function could not be injected.
+       Inject(f func()) error
+
+       // InjectWait is like Inject but does not return till f() has completed.
+       // If f() cannot be injected it returns the error from Inject(), 
otherwise
+       // it returns the error from f()
+       InjectWait(f func() error) error
+}
+
+// bufferChan manages a pair of ping-pong buffers to pass bytes through a 
channel.
+type bufferChan struct {
+       buffers    chan []byte
+       buf1, buf2 []byte
+}
+
+func newBufferChan(size int) *bufferChan {
+       return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, 
size)}
+}
+
+func (b *bufferChan) buffer() []byte {
+       b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
+       return b.buf1[:cap(b.buf1)]
+}
+
+// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
+// Handler functions in a single event-loop goroutine. Actions taken by Handler
+// functions (such as sending messages) are encoded and written to the
+// net.Conn. Create a engine with NewEngine()
+//
+// The proton protocol engine is single threaded (per connection). The Engine 
runs
+// proton in the goroutine that calls Engine.Run() and creates goroutines to 
feed
+// data to/from a net.Conn. You can create multiple Engines to handle multiple
+// connections concurrently.
+//
+// Methods on proton values defined in this package (Sessions, Links etc.) can
+// only be called in the goroutine that executes the corresponding
+// Engine.Run(). You implement the EventHandler or MessagingHandler interfaces
+// and provide those values to NewEngine(). Their HandleEvent method will be
+// called in the Engine goroutine, in typical event-driven style.
+//
+// Handlers can pass values from an event (Connections, Links, Deliveries 
etc.) to
+// other goroutines, store them, or use them as map indexes. Effectively they 
are
+// just pointers.  Other goroutines cannot call their methods directly but 
they can
+// can create a function closure to call such methods and pass it to 
Engine.Inject()
+// to have it evaluated in the engine goroutine.
+//
+// You are responsible for ensuring you don't use an event value after it is
+// invalid. The handler methods will tell you when a value is no longer valid. 
For
+// example after a LinkClosed event, that link is no longer valid. If you do
+// Link.Close() yourself (in a handler or injected function) the link remains 
valid
+// until the corresponing LinkClosed event is received by the handler.
+//
+// Engine.Close() will take care of cleaning up any remaining values when you 
are
+// done with the Engine. All values associated with a engine become invalid 
when you
+// call Engine.Close()
+//
+// The qpid.apache.org/proton/concurrent package will do all this for you, so 
it
+// may be a better choice for some applications.
+//
+type Engine struct {
+       // Error is set on exit from Run() if there was an error.
+       err    internal.FirstError
+       inject chan func()
+
+       conn       net.Conn
+       connection Connection
+       transport  Transport
+       collector  *C.pn_collector_t
+       read       *bufferChan    // Read buffers channel.
+       write      *bufferChan    // Write buffers channel.
+       handlers   []EventHandler // Handlers for proton events.
+       running    chan struct{}  // This channel will be closed when the 
goroutines are done.
+       closeOnce  sync.Once
+}
+
+const bufferSize = 4096
+
+// Map of Connection to *Engine
+var engines = internal.MakeSafeMap()
+
+// NewEngine initializes a engine with a connection and handlers. To start it 
running:
+//    p := NewEngine(...)
+//    go run p.Run()
+// The goroutine will exit when the engine is closed or disconnected.
+// You can check for errors on Engine.Error.
+//
+func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
+       // Save the connection ID for Connection.String()
+       p := &Engine{
+               inject:     make(chan func()),
+               conn:       conn,
+               transport:  Transport{C.pn_transport()},
+               connection: Connection{C.pn_connection()},
+               collector:  C.pn_collector(),
+               handlers:   handlers,
+               read:       newBufferChan(bufferSize),
+               write:      newBufferChan(bufferSize),
+               running:    make(chan struct{}),
+       }
+       if p.transport.IsNil() || p.connection.IsNil() || p.collector == nil {
+               return nil, internal.Errorf("failed to allocate engine")
+       }
+
+       // TODO aconway 2015-06-25: connection settings for user, password, 
container etc.
+       // before transport.Bind() Set up connection before Engine, allow 
Engine or Reactor
+       // to run connection.
+
+       // Unique container-id by default.
+       p.connection.SetContainer(internal.UUID4().String())
+       pnErr := p.transport.Bind(p.connection)
+       if pnErr != 0 {
+               return nil, internal.Errorf("cannot setup engine: %s", 
internal.PnErrorCode(pnErr))
+       }
+       C.pn_connection_collect(p.connection.pn, p.collector)
+       p.connection.Open()
+       connectionContexts.Put(p.connection, connectionContext{p, p.String()})
+       return p, nil
+}
+
+func (p *Engine) String() string {
+       return fmt.Sprintf("%s-%s", p.conn.LocalAddr(), p.conn.RemoteAddr())
+}
+
+func (p *Engine) Id() string {
+       return fmt.Sprintf("%p", &p)
+}
+
+func (p *Engine) Error() error {
+       return p.err.Get()
+}
+
+// Inject a function into the Engine's event loop.
+//
+// f() will be called in the same event-processing goroutine that calls Handler
+// methods. f() can safely call methods on values that belong to this engine
+// (Sessions, Links etc)
+//
+// The injected function has no parameters or return values. It is normally a
+// closure and can use channels to communicate with the injecting goroutine if
+// necessary.
+//
+// Returns a non-nil error if the engine is closed before the function could be
+// injected.
+func (p *Engine) Inject(f func()) error {
+       select {
+       case p.inject <- f:
+               return nil
+       case <-p.running:
+               return p.Error()
+       }
+}
+
+// InjectWait is like Inject but does not return till f() has completed or the
+// engine is closed, and returns an error value from f()
+func (p *Engine) InjectWait(f func() error) error {
+       done := make(chan error)
+       defer close(done)
+       err := p.Inject(func() { done <- f() })
+       if err != nil {
+               return err
+       }
+       select {
+       case <-p.running:
+               return p.Error()
+       case err := <-done:
+               return err
+       }
+}
+
+// Server puts the Engine in server mode, meaning it will auto-detect security 
settings on
+// the incoming connnection such as use of SASL and SSL.
+// Must be called before Run()
+//
+func (p *Engine) Server() { p.transport.SetServer() }
+
+// Close the engine's connection, returns when the engine has exited.
+func (p *Engine) Close(err error) {
+       p.Inject(func() {
+               if err != nil {
+                       p.connection.Condition().SetError(err)
+               }
+               p.connection.Close()
+       })
+       <-p.running
+}
+
+// Disconnect the engine's connection without and AMQP close, returns when the 
engine has exited.
+func (p *Engine) Disconnect(err error) {
+       if err != nil {
+               p.err.Set(err)
+       }
+       p.conn.Close()
+       <-p.running
+}
+
+// Run the engine. Normally called in a goroutine as: go engine.Run()
+// Engine.Run() will exit when the engine is closed or disconnected.
+// You can check for errors after exit with Engine.Error().
+//
+func (p *Engine) Run() {
+       // Signal errors from the read/write goroutines. Don't block if we don't
+       // read all the errors, we only care about the first.
+       error := make(chan error, 2)
+       wait := sync.WaitGroup{}
+       wait.Add(2)
+
+       go func() { // Read goroutine
+               defer wait.Done()
+               for {
+                       rbuf := p.read.buffer()
+                       n, err := p.conn.Read(rbuf)
+                       if n > 0 {
+                               p.read.buffers <- rbuf[:n]
+                       } else if err != nil {
+                               close(p.read.buffers)
+                               error <- err
+                               return
+                       }
+               }
+       }()
+
+       go func() { // Write goroutine
+               defer wait.Done()
+               for {
+                       wbuf, ok := <-p.write.buffers
+                       if !ok {
+                               return
+                       }
+                       _, err := p.conn.Write(wbuf)
+                       if err != nil {
+                               error <- err
+                               return
+                       }
+               }
+       }()
+
+       wbuf := p.write.buffer()[:0]
+loop:
+       for {
+               if len(wbuf) == 0 {
+                       p.pop(&wbuf)
+               }
+               // Don't set wchan unless there is something to write.
+               var wchan chan []byte
+               if len(wbuf) > 0 {
+                       wchan = p.write.buffers
+               }
+
+               select {
+               case buf := <-p.read.buffers: // Read a buffer
+                       p.push(buf)
+               case wchan <- wbuf: // Write a buffer
+                       wbuf = p.write.buffer()[:0]
+               case f := <-p.inject: // Function injected from another 
goroutine
+                       f()
+               case err := <-error: // Network read or write error
+                       p.conn.Close() // Make sure both sides are closed
+                       p.err.Set(err)
+                       p.transport.CloseHead()
+                       p.transport.CloseTail()
+               }
+               p.process()
+               if p.err.Get() != nil {
+                       break loop
+               }
+       }
+       close(p.write.buffers)
+       p.conn.Close()
+       wait.Wait()
+       connectionContexts.Delete(p.connection)
+       if !p.connection.IsNil() {
+               p.connection.Free()
+       }
+       if !p.transport.IsNil() {
+               p.transport.Free()
+       }
+       if p.collector != nil {
+               C.pn_collector_free(p.collector)
+       }
+       for _, h := range p.handlers {
+               switch h := h.(type) {
+               case cHandler:
+                       C.pn_handler_free(h.pn)
+               }
+       }
+       close(p.running) // Signal goroutines have exited and Error is set.
+}
+
+func minInt(a, b int) int {
+       if a < b {
+               return a
+       } else {
+               return b
+       }
+}
+
+func (p *Engine) pop(buf *[]byte) {
+       pending := int(p.transport.Pending())
+       switch {
+       case pending == int(C.PN_EOS):
+               *buf = (*buf)[:]
+               return
+       case pending < 0:
+               panic(internal.Errorf("%s", internal.PnErrorCode(pending)))
+       }
+       size := minInt(pending, cap(*buf))
+       *buf = (*buf)[:size]
+       if size == 0 {
+               return
+       }
+       C.memcpy(unsafe.Pointer(&(*buf)[0]), p.transport.Head(), C.size_t(size))
+       internal.Assert(size > 0)
+       p.transport.Pop(uint(size))
+}
+
+func (p *Engine) push(buf []byte) {
+       buf2 := buf
+       for len(buf2) > 0 {
+               n := p.transport.Push(buf2)
+               if n <= 0 {
+                       panic(internal.Errorf("error in transport: %s", 
internal.PnErrorCode(n)))
+               }
+               buf2 = buf2[n:]
+       }
+}
+
+func (p *Engine) handle(e Event) (more bool) {
+       for _, h := range p.handlers {
+               h.HandleEvent(e)
+       }
+       if e.Type() == ETransportClosed {
+               p.err.Set(e.Connection().RemoteCondition().Error())
+               p.err.Set(e.Connection().Transport().Condition().Error())
+               if p.err.Get() == nil {
+                       p.err.Set(io.EOF)
+               }
+               return false
+       }
+       return true
+}
+
+func (p *Engine) process() (more bool) {
+       for ce := C.pn_collector_peek(p.collector); ce != nil; ce = 
C.pn_collector_peek(p.collector) {
+               e := makeEvent(ce)
+               if !p.handle(e) {
+                       return false
+               }
+               C.pn_collector_pop(p.collector)
+       }
+       return true
+}
+
+func (p *Engine) Connection() Connection { return p.connection }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go
deleted file mode 100644
index 7c00aa0..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/doc.go
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/*
-Package amqp encodes and decodes AMQP messages and data as Go types.
-
-It follows the standard 'encoding' libraries pattern. The mapping between AMQP
-and Go types is described in the documentation of the Marshal and Unmarshal
-functions.
-
-The sub-packages 'event' and 'messaging' provide two alternative ways to write
-AMQP clients and servers. 'messaging' is easier for general purpose use. 
'event'
-gives complete low-level control of the underlying proton C engine.
-
-AMQP is an open standard for inter-operable message exchange, see 
<http://www.amqp.org/>
-*/
-package amqp
-
-// #cgo LDFLAGS: -lqpid-proton
-import "C"
-
-// This file is just for the package comment.
-
-// FIXME aconway 2015-04-28: need to re-organize the package, it's not very 
intuitive.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop
deleted file mode 120000
index cc3641d..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop
+++ /dev/null
@@ -1 +0,0 @@
-../../../../../../../../tests/interop
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go
deleted file mode 100644
index 11049f7..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/interop_test.go
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-// Test that conversion of Go type to/from AMQP is compatible with other
-// bindings.
-//
-package amqp
-
-import (
-       "bytes"
-       "fmt"
-       "io"
-       "io/ioutil"
-       "os"
-       "reflect"
-       "strings"
-       "testing"
-)
-
-func assertEqual(want interface{}, got interface{}) {
-       if !reflect.DeepEqual(want, got) {
-               panic(fmt.Errorf("%#v != %#v", want, got))
-       }
-}
-
-func assertNil(err interface{}) {
-       if err != nil {
-               panic(err)
-       }
-}
-
-func getReader(name string) (r io.Reader) {
-       r, err := os.Open("interop/" + name + ".amqp")
-       if err != nil {
-               panic(fmt.Errorf("Can't open %#v: %v", name, err))
-       }
-       return
-}
-
-func remaining(d *Decoder) string {
-       remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
-       return string(remainder)
-}
-
-// assertDecode: want is the expected value, gotPtr is a pointer to a
-// instance of the same type for Decode.
-func assertDecode(d *Decoder, want interface{}, gotPtr interface{}) {
-
-       assertNil(d.Decode(gotPtr))
-
-       got := reflect.ValueOf(gotPtr).Elem().Interface()
-       assertEqual(want, got)
-
-       // Try round trip encoding
-       bytes, err := Marshal(want, nil)
-       assertNil(err)
-       n, err := Unmarshal(bytes, gotPtr)
-       assertNil(err)
-       assertEqual(n, len(bytes))
-       got = reflect.ValueOf(gotPtr).Elem().Interface()
-       assertEqual(want, got)
-}
-
-func TestUnmarshal(t *testing.T) {
-       bytes, err := ioutil.ReadAll(getReader("strings"))
-       if err != nil {
-               t.Error(err)
-       }
-       for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", 
"", ""} {
-               var got string
-               n, err := Unmarshal(bytes, &got)
-               if err != nil {
-                       t.Error(err)
-               }
-               if want != got {
-                       t.Errorf("%#v != %#v", want, got)
-               }
-               bytes = bytes[n:]
-       }
-}
-
-func TestPrimitivesExact(t *testing.T) {
-       d := NewDecoder(getReader("primitives"))
-       // Decoding into exact types
-       var b bool
-       assertDecode(d, true, &b)
-       assertDecode(d, false, &b)
-       var u8 uint8
-       assertDecode(d, uint8(42), &u8)
-       var u16 uint16
-       assertDecode(d, uint16(42), &u16)
-       var i16 int16
-       assertDecode(d, int16(-42), &i16)
-       var u32 uint32
-       assertDecode(d, uint32(12345), &u32)
-       var i32 int32
-       assertDecode(d, int32(-12345), &i32)
-       var u64 uint64
-       assertDecode(d, uint64(12345), &u64)
-       var i64 int64
-       assertDecode(d, int64(-12345), &i64)
-       var f32 float32
-       assertDecode(d, float32(0.125), &f32)
-       var f64 float64
-       assertDecode(d, float64(0.125), &f64)
-}
-
-func TestPrimitivesCompatible(t *testing.T) {
-       d := NewDecoder(getReader("primitives"))
-       // Decoding into compatible types
-       var b bool
-       var i int
-       var u uint
-       var f float64
-       assertDecode(d, true, &b)
-       assertDecode(d, false, &b)
-       assertDecode(d, uint(42), &u)
-       assertDecode(d, uint(42), &u)
-       assertDecode(d, -42, &i)
-       assertDecode(d, uint(12345), &u)
-       assertDecode(d, -12345, &i)
-       assertDecode(d, uint(12345), &u)
-       assertDecode(d, -12345, &i)
-       assertDecode(d, 0.125, &f)
-       assertDecode(d, 0.125, &f)
-}
-
-// assertDecodeValue: want is the expected value, decode into a reflect.Value
-func assertDecodeInterface(d *Decoder, want interface{}) {
-
-       var got, got2 interface{}
-       assertNil(d.Decode(&got))
-
-       assertEqual(want, got)
-
-       // Try round trip encoding
-       bytes, err := Marshal(got, nil)
-       assertNil(err)
-       n, err := Unmarshal(bytes, &got2)
-       assertNil(err)
-       assertEqual(n, len(bytes))
-       assertEqual(want, got2)
-}
-
-func TestPrimitivesInterface(t *testing.T) {
-       d := NewDecoder(getReader("primitives"))
-       assertDecodeInterface(d, true)
-       assertDecodeInterface(d, false)
-       assertDecodeInterface(d, uint8(42))
-       assertDecodeInterface(d, uint16(42))
-       assertDecodeInterface(d, int16(-42))
-       assertDecodeInterface(d, uint32(12345))
-       assertDecodeInterface(d, int32(-12345))
-       assertDecodeInterface(d, uint64(12345))
-       assertDecodeInterface(d, int64(-12345))
-       assertDecodeInterface(d, float32(0.125))
-       assertDecodeInterface(d, float64(0.125))
-}
-
-func TestStrings(t *testing.T) {
-       d := NewDecoder(getReader("strings"))
-       // Test decoding as plain Go strings
-       for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", 
"", ""} {
-               var got string
-               assertDecode(d, want, &got)
-       }
-       remains := remaining(d)
-       if remains != "" {
-               t.Errorf("leftover: %s", remains)
-       }
-
-       // Test decoding as specific string types
-       d = NewDecoder(getReader("strings"))
-       var bytes []byte
-       var str, sym string
-       assertDecode(d, []byte("abc\000defg"), &bytes)
-       assertDecode(d, "abcdefg", &str)
-       assertDecode(d, "abcdefg", &sym)
-       assertDecode(d, make([]byte, 0), &bytes)
-       assertDecode(d, "", &str)
-       assertDecode(d, "", &sym)
-       remains = remaining(d)
-       if remains != "" {
-               t.Fatalf("leftover: %s", remains)
-       }
-
-       // Test some error handling
-       d = NewDecoder(getReader("strings"))
-       var s string
-       err := d.Decode(s)
-       if err == nil {
-               t.Fatal("Expected error")
-       }
-       if !strings.Contains(err.Error(), "not a pointer") {
-               t.Error(err)
-       }
-       var i int
-       err = d.Decode(&i)
-       if !strings.Contains(err.Error(), "cannot unmarshal") {
-               t.Error(err)
-       }
-       _, err = Unmarshal([]byte{}, nil)
-       if !strings.Contains(err.Error(), "not enough data") {
-               t.Error(err)
-       }
-       _, err = Unmarshal([]byte("foobar"), nil)
-       if !strings.Contains(err.Error(), "invalid-argument") {
-               t.Error(err)
-       }
-}
-
-func TestEncodeDecode(t *testing.T) {
-       type data struct {
-               s  string
-               i  int
-               u8 uint8
-               b  bool
-               f  float32
-               v  interface{}
-       }
-
-       in := data{"foo", 42, 9, true, 1.234, "thing"}
-
-       buf := bytes.Buffer{}
-       e := NewEncoder(&buf)
-       assertNil(e.Encode(in.s))
-       assertNil(e.Encode(in.i))
-       assertNil(e.Encode(in.u8))
-       assertNil(e.Encode(in.b))
-       assertNil(e.Encode(in.f))
-       assertNil(e.Encode(in.v))
-
-       var out data
-       d := NewDecoder(&buf)
-       assertNil(d.Decode(&out.s))
-       assertNil(d.Decode(&out.i))
-       assertNil(d.Decode(&out.u8))
-       assertNil(d.Decode(&out.b))
-       assertNil(d.Decode(&out.f))
-       assertNil(d.Decode(&out.v))
-
-       assertEqual(in, out)
-}
-
-func TestMap(t *testing.T) {
-       d := NewDecoder(getReader("maps"))
-
-       // Generic map
-       var m Map
-       assertDecode(d, Map{"one": int32(1), "two": int32(2), "three": 
int32(3)}, &m)
-
-       // Interface as map
-       var i interface{}
-       assertDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): 
"three"}, &i)
-
-       d = NewDecoder(getReader("maps"))
-       // Specific typed map
-       var m2 map[string]int
-       assertDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2)
-
-       // Round trip a nested map
-       m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, 
uint8(2): false}}
-       bytes, err := Marshal(m, nil)
-       assertNil(err)
-       _, err = Unmarshal(bytes, &i)
-       assertNil(err)
-       assertEqual(m, i)
-}
-
-func TestList(t *testing.T) {
-       d := NewDecoder(getReader("lists"))
-       var l List
-       assertDecode(d, List{int32(32), "foo", true}, &l)
-       assertDecode(d, List{}, &l)
-}
-
-func FIXMETestMessage(t *testing.T) {
-       // FIXME aconway 2015-04-09: integrate Message encoding under 
marshal/unmarshal API.
-       bytes, err := ioutil.ReadAll(getReader("message"))
-       assertNil(err)
-       m, err := DecodeMessage(bytes)
-       assertNil(err)
-       fmt.Printf("%+v\n", m)
-       assertEqual(m.Body(), "hello")
-
-       bytes2 := make([]byte, len(bytes))
-       bytes2, err = m.Encode(bytes2)
-       assertNil(err)
-       assertEqual(bytes, bytes2)
-}
-
-// FIXME aconway 2015-03-13: finish the full interop test

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go
deleted file mode 100644
index e5c2945..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/marshal.go
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
-       "io"
-       "qpid.apache.org/proton/go/internal"
-       "reflect"
-       "unsafe"
-)
-
-func dataError(prefix string, data *C.pn_data_t) error {
-       err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
-       if err != nil {
-               err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
-       }
-       return err
-}
-
-/*
-Marshal encodes a Go value as AMQP data in buffer.
-If buffer is nil, or is not large enough, a new buffer  is created.
-
-Returns the buffer used for encoding with len() adjusted to the actual size of 
data.
-
-Go types are encoded as follows
-
- 
+-------------------------------------+--------------------------------------------+
- |Go type                              |AMQP type                              
     |
- 
+-------------------------------------+--------------------------------------------+
- |bool                                 |bool                                   
     |
- 
+-------------------------------------+--------------------------------------------+
- |int8, int16, int32, int64 (int)      |byte, short, int, long (int or long)   
     |
- 
+-------------------------------------+--------------------------------------------+
- |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or 
ulong)  |
- 
+-------------------------------------+--------------------------------------------+
- |float32, float64                     |float, double.                         
     |
- 
+-------------------------------------+--------------------------------------------+
- |string                               |string                                 
     |
- 
+-------------------------------------+--------------------------------------------+
- |[]byte, Binary                       |binary                                 
     |
- 
+-------------------------------------+--------------------------------------------+
- |Symbol                               |symbol                                 
     |
- 
+-------------------------------------+--------------------------------------------+
- |interface{}                          |the contained type                     
     |
- 
+-------------------------------------+--------------------------------------------+
- |nil                                  |null                                   
     |
- 
+-------------------------------------+--------------------------------------------+
- |map[K]T                              |map with K and T converted as above    
     |
- 
+-------------------------------------+--------------------------------------------+
- |Map                                  |map, may have mixed types for keys, 
values  |
- 
+-------------------------------------+--------------------------------------------+
- |[]T                                  |list with T converted as above         
     |
- 
+-------------------------------------+--------------------------------------------+
- |List                                 |list, may have mixed types  values     
     |
- 
+-------------------------------------+--------------------------------------------+
-
-TODO Go types: array, slice, struct
-
-Go types that cannot be marshaled: complex64/128, uintptr, function, 
interface, channel
-*/
-func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
-       defer internal.DoRecover(&err)
-       data := C.pn_data(0)
-       defer C.pn_data_free(data)
-       put(data, v)
-       encode := func(buf []byte) ([]byte, error) {
-               n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
-               switch {
-               case n == int(C.PN_OVERFLOW):
-                       return buf, overflow
-               case n < 0:
-                       return buf, dataError("marshal error", data)
-               default:
-                       return buf[:n], nil
-               }
-       }
-       return encodeGrow(buffer, encode)
-}
-
-const minEncode = 256
-
-// overflow is returned when an encoding function can't fit data in the buffer.
-var overflow = internal.Errorf("buffer too small")
-
-// encodeFn encodes into buffer[0:len(buffer)].
-// Returns buffer with length adjusted for data encoded.
-// If buffer too small, returns overflow as error.
-type encodeFn func(buffer []byte) ([]byte, error)
-
-// encodeGrow calls encode() into buffer, if it returns overflow grows the 
buffer.
-// Returns the final buffer.
-func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
-       if buffer == nil || len(buffer) == 0 {
-               buffer = make([]byte, minEncode)
-       }
-       var err error
-       for buffer, err = encode(buffer); err == overflow; buffer, err = 
encode(buffer) {
-               buffer = make([]byte, 2*len(buffer))
-       }
-       return buffer, err
-}
-
-func put(data *C.pn_data_t, v interface{}) {
-       switch v := v.(type) {
-       case nil:
-               C.pn_data_put_null(data)
-       case bool:
-               C.pn_data_put_bool(data, C.bool(v))
-       case int8:
-               C.pn_data_put_byte(data, C.int8_t(v))
-       case int16:
-               C.pn_data_put_short(data, C.int16_t(v))
-       case int32:
-               C.pn_data_put_int(data, C.int32_t(v))
-       case int64:
-               C.pn_data_put_long(data, C.int64_t(v))
-       case int:
-               if unsafe.Sizeof(0) == 8 {
-                       C.pn_data_put_long(data, C.int64_t(v))
-               } else {
-                       C.pn_data_put_int(data, C.int32_t(v))
-               }
-       case uint8:
-               C.pn_data_put_ubyte(data, C.uint8_t(v))
-       case uint16:
-               C.pn_data_put_ushort(data, C.uint16_t(v))
-       case uint32:
-               C.pn_data_put_uint(data, C.uint32_t(v))
-       case uint64:
-               C.pn_data_put_ulong(data, C.uint64_t(v))
-       case uint:
-               if unsafe.Sizeof(0) == 8 {
-                       C.pn_data_put_ulong(data, C.uint64_t(v))
-               } else {
-                       C.pn_data_put_uint(data, C.uint32_t(v))
-               }
-       case float32:
-               C.pn_data_put_float(data, C.float(v))
-       case float64:
-               C.pn_data_put_double(data, C.double(v))
-       case string:
-               C.pn_data_put_string(data, pnBytes([]byte(v)))
-       case []byte:
-               C.pn_data_put_binary(data, pnBytes(v))
-       case Binary:
-               C.pn_data_put_binary(data, pnBytes([]byte(v)))
-       case Symbol:
-               C.pn_data_put_symbol(data, pnBytes([]byte(v)))
-       case Map: // Special map type
-               C.pn_data_put_map(data)
-               C.pn_data_enter(data)
-               for key, val := range v {
-                       put(data, key)
-                       put(data, val)
-               }
-               C.pn_data_exit(data)
-       default:
-               switch reflect.TypeOf(v).Kind() {
-               case reflect.Map:
-                       putMap(data, v)
-               case reflect.Slice:
-                       putList(data, v)
-               default:
-                       panic(internal.Errorf("cannot marshal %s to AMQP", 
reflect.TypeOf(v)))
-               }
-       }
-       err := dataError("marshal", data)
-       if err != nil {
-               panic(err)
-       }
-       return
-}
-
-func putMap(data *C.pn_data_t, v interface{}) {
-       mapValue := reflect.ValueOf(v)
-       C.pn_data_put_map(data)
-       C.pn_data_enter(data)
-       for _, key := range mapValue.MapKeys() {
-               put(data, key.Interface())
-               put(data, mapValue.MapIndex(key).Interface())
-       }
-       C.pn_data_exit(data)
-}
-
-func putList(data *C.pn_data_t, v interface{}) {
-       listValue := reflect.ValueOf(v)
-       C.pn_data_put_list(data)
-       C.pn_data_enter(data)
-       for i := 0; i < listValue.Len(); i++ {
-               put(data, listValue.Index(i).Interface())
-       }
-       C.pn_data_exit(data)
-}
-
-// Encoder encodes AMQP values to an io.Writer
-type Encoder struct {
-       writer io.Writer
-       buffer []byte
-}
-
-// New encoder returns a new encoder that writes to w.
-func NewEncoder(w io.Writer) *Encoder {
-       return &Encoder{w, make([]byte, minEncode)}
-}
-
-func (e *Encoder) Encode(v interface{}) (err error) {
-       e.buffer, err = Marshal(v, e.buffer)
-       if err == nil {
-               e.writer.Write(e.buffer)
-       }
-       return err
-}
-
-func replace(data *C.pn_data_t, v interface{}) {
-       C.pn_data_clear(data)
-       put(data, v)
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go
deleted file mode 100644
index 87093f5..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message.go
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/types.h>
-// #include <proton/message.h>
-// #include <proton/codec.h>
-import "C"
-
-import (
-       "qpid.apache.org/proton/go/internal"
-       "time"
-       "unsafe"
-)
-
-// FIXME aconway 2015-04-28: Do we need the interface or can we just export 
the struct?
-
-// Message is the interface to an AMQP message.
-// Instances of this interface contain a pointer to the underlying struct.
-type Message interface {
-       /**
-        * Inferred indicates how the message content
-        * is encoded into AMQP sections. If inferred is true then binary and
-        * list values in the body of the message will be encoded as AMQP DATA
-        * and AMQP SEQUENCE sections, respectively. If inferred is false,
-        * then all values in the body of the message will be encoded as AMQP
-        * VALUE sections regardless of their type.
-        */
-       Inferred() bool
-       SetInferred(bool)
-
-       /**
-        * Durable indicates that any parties taking responsibility
-        * for the message must durably store the content.
-        */
-       Durable() bool
-       SetDurable(bool)
-
-       /**
-        * Priority impacts ordering guarantees. Within a
-        * given ordered context, higher priority messages may jump ahead of
-        * lower priority messages.
-        */
-       Priority() uint8
-       SetPriority(uint8)
-
-       /**
-        * TTL or Time To Live, a message it may be dropped after this duration
-        */
-       TTL() time.Duration
-       SetTTL(time.Duration)
-
-       /**
-        * FirstAcquirer indicates
-        * that the recipient of the message is the first recipient to acquire
-        * the message, i.e. there have been no failed delivery attempts to
-        * other acquirers. Note that this does not mean the message has not
-        * been delivered to, but not acquired, by other recipients.
-        */
-       FirstAcquirer() bool
-       SetFirstAcquirer(bool)
-
-       /**
-        * DeliveryCount tracks how many attempts have been made to
-        * delivery a message.
-        */
-       DeliveryCount() uint32
-       SetDeliveryCount(uint32)
-
-       /**
-        * MessageId provides a unique identifier for a message.
-        * it can be an a string, an unsigned long, a uuid or a
-        * binary value.
-        */
-       MessageId() interface{}
-       SetMessageId(interface{})
-
-       UserId() string
-       SetUserId(string)
-
-       Address() string
-       SetAddress(string)
-
-       Subject() string
-       SetSubject(string)
-
-       ReplyTo() string
-       SetReplyTo(string)
-
-       /**
-        * CorrelationId is set on correlated request and response messages. It 
can be an a string, an unsigned long, a uuid or a
-        * binary value.
-        */
-       CorrelationId() interface{}
-       SetCorrelationId(interface{})
-
-       ContentType() string
-       SetContentType(string)
-
-       ContentEncoding() string
-       SetContentEncoding(string)
-
-       // ExpiryTime indicates an absoulte time when the message may be 
dropped.
-       // A Zero time (i.e. t.isZero() == true) indicates a message never 
expires.
-       ExpiryTime() time.Time
-       SetExpiryTime(time.Time)
-
-       CreationTime() time.Time
-       SetCreationTime(time.Time)
-
-       GroupId() string
-       SetGroupId(string)
-
-       GroupSequence() int32
-       SetGroupSequence(int32)
-
-       ReplyToGroupId() string
-       SetReplyToGroupId(string)
-
-       /**
-        * Instructions can be used to access or modify AMQP delivery 
instructions.
-        */
-       Instructions() *map[string]interface{}
-
-       /**
-        * Annotations  can be used to access or modify AMQP annotations.
-        */
-       Annotations() *map[string]interface{}
-
-       /**
-        * Properties  can be used to access or modify the application 
properties of a message.
-        */
-       Properties() *map[string]interface{}
-
-       /**
-        * Body of the message can be any AMQP encodable type.
-        */
-       Body() interface{}
-       SetBody(interface{})
-
-       // Encode encodes the message as AMQP data. If buffer is non-nil and is 
large enough
-       // the message is encoded into it, otherwise a new buffer is created.
-       // Returns the buffer containing the message.
-       Encode(buffer []byte) ([]byte, error)
-}
-
-// NewMessage creates a new message instance. The returned interface contains 
a pointer.
-func NewMessage() Message {
-       pn := C.pn_message() // Pick up default setting from C message.
-       defer C.pn_message_free(pn)
-       return goMessage(pn)
-}
-
-// Message implementation copies all message data into Go space so it can be 
proprely
-// memory managed.
-//
-type message struct {
-       inferred, durable, firstAcquirer      bool
-       priority                              uint8
-       ttl                                   time.Duration
-       deliveryCount                         uint32
-       messageId                             interface{}
-       userId, address, subject, replyTo     string
-       contentType, contentEncoding          string
-       groupId, replyToGroupId               string
-       creationTime, expiryTime              time.Time
-       groupSequence                         int32
-       correlationId                         interface{}
-       instructions, annotations, properties map[string]interface{}
-       body                                  interface{}
-}
-
-func (m *message) Inferred() bool                        { return m.inferred }
-func (m *message) SetInferred(b bool)                    { m.inferred = b }
-func (m *message) Durable() bool                         { return m.durable }
-func (m *message) SetDurable(b bool)                     { m.durable = b }
-func (m *message) Priority() uint8                       { return m.priority }
-func (m *message) SetPriority(b uint8)                   { m.priority = b }
-func (m *message) TTL() time.Duration                    { return m.ttl }
-func (m *message) SetTTL(d time.Duration)                { m.ttl = d }
-func (m *message) FirstAcquirer() bool                   { return 
m.firstAcquirer }
-func (m *message) SetFirstAcquirer(b bool)               { m.firstAcquirer = b 
}
-func (m *message) DeliveryCount() uint32                 { return 
m.deliveryCount }
-func (m *message) SetDeliveryCount(c uint32)             { m.deliveryCount = c 
}
-func (m *message) MessageId() interface{}                { return m.messageId }
-func (m *message) SetMessageId(id interface{})           { m.messageId = id }
-func (m *message) UserId() string                        { return m.userId }
-func (m *message) SetUserId(s string)                    { m.userId = s }
-func (m *message) Address() string                       { return m.address }
-func (m *message) SetAddress(s string)                   { m.address = s }
-func (m *message) Subject() string                       { return m.subject }
-func (m *message) SetSubject(s string)                   { m.subject = s }
-func (m *message) ReplyTo() string                       { return m.replyTo }
-func (m *message) SetReplyTo(s string)                   { m.replyTo = s }
-func (m *message) CorrelationId() interface{}            { return 
m.correlationId }
-func (m *message) SetCorrelationId(c interface{})        { m.correlationId = c 
}
-func (m *message) ContentType() string                   { return 
m.contentType }
-func (m *message) SetContentType(s string)               { m.contentType = s }
-func (m *message) ContentEncoding() string               { return 
m.contentEncoding }
-func (m *message) SetContentEncoding(s string)           { m.contentEncoding = 
s }
-func (m *message) ExpiryTime() time.Time                 { return m.expiryTime 
}
-func (m *message) SetExpiryTime(t time.Time)             { m.expiryTime = t }
-func (m *message) CreationTime() time.Time               { return 
m.creationTime }
-func (m *message) SetCreationTime(t time.Time)           { m.creationTime = t }
-func (m *message) GroupId() string                       { return m.groupId }
-func (m *message) SetGroupId(s string)                   { m.groupId = s }
-func (m *message) GroupSequence() int32                  { return 
m.groupSequence }
-func (m *message) SetGroupSequence(s int32)              { m.groupSequence = s 
}
-func (m *message) ReplyToGroupId() string                { return 
m.replyToGroupId }
-func (m *message) SetReplyToGroupId(s string)            { m.replyToGroupId = 
s }
-func (m *message) Instructions() *map[string]interface{} { return 
&m.instructions }
-func (m *message) Annotations() *map[string]interface{}  { return 
&m.annotations }
-func (m *message) Properties() *map[string]interface{}   { return 
&m.properties }
-func (m *message) Body() interface{}                     { return m.body }
-func (m *message) SetBody(b interface{})                 { m.body = b }
-
-// rewindGet rewinds and then gets the value from a data object.
-func rewindGet(data *C.pn_data_t, v interface{}) {
-       if data != nil && C.pn_data_size(data) > 0 {
-               C.pn_data_rewind(data)
-               C.pn_data_next(data)
-               get(data, v)
-       }
-}
-
-// goMessage populates a Go message from a pn_message_t
-func goMessage(pn *C.pn_message_t) *message {
-       m := &message{
-               inferred:        bool(C.pn_message_is_inferred(pn)),
-               durable:         bool(C.pn_message_is_durable(pn)),
-               priority:        uint8(C.pn_message_get_priority(pn)),
-               ttl:             time.Duration(C.pn_message_get_ttl(pn)) * 
time.Millisecond,
-               firstAcquirer:   bool(C.pn_message_is_first_acquirer(pn)),
-               deliveryCount:   uint32(C.pn_message_get_delivery_count(pn)),
-               userId:          goString(C.pn_message_get_user_id(pn)),
-               address:         C.GoString(C.pn_message_get_address(pn)),
-               subject:         C.GoString(C.pn_message_get_subject(pn)),
-               replyTo:         C.GoString(C.pn_message_get_reply_to(pn)),
-               contentType:     C.GoString(C.pn_message_get_content_type(pn)),
-               contentEncoding: 
C.GoString(C.pn_message_get_content_encoding(pn)),
-               expiryTime:      time.Unix(0, 
int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(pn)))),
-               creationTime:    time.Unix(0, 
int64(time.Millisecond)*int64(C.pn_message_get_creation_time(pn))),
-               groupId:         C.GoString(C.pn_message_get_group_id(pn)),
-               groupSequence:   int32(C.pn_message_get_group_sequence(pn)),
-               replyToGroupId:  
C.GoString(C.pn_message_get_reply_to_group_id(pn)),
-               messageId:       nil,
-               correlationId:   nil,
-               instructions:    make(map[string]interface{}),
-               annotations:     make(map[string]interface{}),
-               properties:      make(map[string]interface{}),
-       }
-       rewindGet(C.pn_message_id(pn), &m.messageId)
-       rewindGet(C.pn_message_correlation_id(pn), &m.correlationId)
-       rewindGet(C.pn_message_instructions(pn), &m.instructions)
-       rewindGet(C.pn_message_annotations(pn), &m.annotations)
-       rewindGet(C.pn_message_properties(pn), &m.properties)
-       rewindGet(C.pn_message_body(pn), &m.body)
-       return m
-}
-
-// pnMessage populates a pn_message_t from a Go message.
-func (m *message) pnMessage() *C.pn_message_t {
-       pn := C.pn_message()
-       C.pn_message_set_inferred(pn, C.bool(m.Inferred()))
-       C.pn_message_set_durable(pn, C.bool(m.Durable()))
-       C.pn_message_set_priority(pn, C.uint8_t(m.priority))
-       C.pn_message_set_ttl(pn, C.pn_millis_t(m.TTL()/time.Millisecond))
-       C.pn_message_set_first_acquirer(pn, C.bool(m.FirstAcquirer()))
-       C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount))
-       replace(C.pn_message_id(pn), m.MessageId())
-       C.pn_message_set_user_id(pn, pnBytes([]byte(m.UserId())))
-       C.pn_message_set_address(pn, C.CString(m.Address()))
-       C.pn_message_set_subject(pn, C.CString(m.Subject()))
-       C.pn_message_set_reply_to(pn, C.CString(m.ReplyTo()))
-       replace(C.pn_message_correlation_id(pn), m.CorrelationId())
-       C.pn_message_set_content_type(pn, C.CString(m.ContentType()))
-       C.pn_message_set_content_encoding(pn, C.CString(m.ContentEncoding()))
-       C.pn_message_set_expiry_time(pn, pnTime(m.ExpiryTime()))
-       C.pn_message_set_creation_time(pn, pnTime(m.CreationTime()))
-       C.pn_message_set_group_id(pn, C.CString(m.GroupId()))
-       C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.GroupSequence()))
-       C.pn_message_set_reply_to_group_id(pn, C.CString(m.ReplyToGroupId()))
-       replace(C.pn_message_instructions(pn), *m.Instructions())
-       replace(C.pn_message_annotations(pn), *m.Annotations())
-       replace(C.pn_message_properties(pn), *m.Properties())
-       replace(C.pn_message_body(pn), m.Body())
-       return pn
-}
-
-// FIXME aconway 2015-04-08: Move message encode/decode under 
Marshal/Unmarshal interfaces.
-
-// DecodeMessage decodes bytes as a message
-func DecodeMessage(data []byte) (Message, error) {
-       pnMsg := C.pn_message()
-       defer C.pn_message_free(pnMsg)
-       if len(data) == 0 {
-               return nil, internal.Errorf("empty buffer for decode")
-       }
-       if C.pn_message_decode(pnMsg, cPtr(data), cLen(data)) < 0 {
-               return nil, internal.Errorf("decoding message: %s",
-                       
internal.PnError(unsafe.Pointer(C.pn_message_error(pnMsg))))
-       }
-       return goMessage(pnMsg), nil
-}
-
-// Encode the message into bufffer.
-// If buffer is nil or len(buffer) is not sufficient to encode the message a 
larger
-// buffer will be returned.
-func (m *message) Encode(buffer []byte) ([]byte, error) {
-       pn := m.pnMessage()
-       defer C.pn_message_free(pn)
-       encode := func(buf []byte) ([]byte, error) {
-               len := cLen(buf)
-               result := C.pn_message_encode(pn, cPtr(buf), &len)
-               switch {
-               case result == C.PN_OVERFLOW:
-                       return buf, overflow
-               case result < 0:
-                       return buf, internal.Errorf("cannot encode message: 
%s", internal.PnErrorCode(result))
-               default:
-                       return buf[:len], nil
-               }
-       }
-       return encodeGrow(buffer, encode)
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go
----------------------------------------------------------------------
diff --git 
a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go 
b/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go
deleted file mode 100644
index 46e26de..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/amqp/message_test.go
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
-       "reflect"
-       "testing"
-       "time"
-)
-
-func roundTrip(t *testing.T, m Message) {
-       buffer, err := m.Encode(nil)
-       if err != nil {
-               t.Fatalf("Encode failed: %v", err)
-       }
-       m2, err := DecodeMessage(buffer)
-       if err != nil {
-               t.Fatalf("Decode failed: %v", err)
-       }
-       if !reflect.DeepEqual(m, m2) {
-               t.Errorf("Message mismatch got\n%#v\nwant\n%#v", m, m2)
-       }
-}
-
-func TestDefaultMessageRoundTrip(t *testing.T) {
-       m := NewMessage()
-       // Check defaults
-       assertEqual(m.Inferred(), false)
-       assertEqual(m.Durable(), false)
-       assertEqual(m.Priority(), uint8(4))
-       assertEqual(m.TTL(), time.Duration(0))
-       assertEqual(m.UserId(), "")
-       assertEqual(m.Address(), "")
-       assertEqual(m.Subject(), "")
-       assertEqual(m.ReplyTo(), "")
-       assertEqual(m.ContentType(), "")
-       assertEqual(m.ContentEncoding(), "")
-       assertEqual(m.GroupId(), "")
-       assertEqual(m.GroupSequence(), int32(0))
-       assertEqual(m.ReplyToGroupId(), "")
-       assertEqual(m.MessageId(), nil)
-       assertEqual(m.CorrelationId(), nil)
-       assertEqual(*m.Instructions(), map[string]interface{}{})
-       assertEqual(*m.Annotations(), map[string]interface{}{})
-       assertEqual(*m.Properties(), map[string]interface{}{})
-       assertEqual(m.Body(), nil)
-
-       roundTrip(t, m)
-}
-
-func TestMessageRoundTrip(t *testing.T) {
-       m := NewMessage()
-       m.SetInferred(false)
-       m.SetDurable(true)
-       m.SetPriority(42)
-       m.SetTTL(0)
-       m.SetUserId("user")
-       m.SetAddress("address")
-       m.SetSubject("subject")
-       m.SetReplyTo("replyto")
-       m.SetContentType("content")
-       m.SetContentEncoding("encoding")
-       m.SetGroupId("group")
-       m.SetGroupSequence(42)
-       m.SetReplyToGroupId("replytogroup")
-       m.SetMessageId("id")
-       m.SetCorrelationId("correlation")
-       *m.Instructions() = map[string]interface{}{"instructions": "foo"}
-       *m.Annotations() = map[string]interface{}{"annotations": "foo"}
-       *m.Properties() = map[string]interface{}{"int": int32(32), "bool": 
true, "string": "foo"}
-       m.SetBody("hello")
-       roundTrip(t, m)
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to