PROTON-827: Concurrent Go API for proton.

Provide a simple concurrent-safe, blocking API for proton.
See the qpid.apache.org/proton/concurrent package documentation for details.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bd3fb337
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bd3fb337
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bd3fb337

Branch: refs/heads/proton-go
Commit: bd3fb337c475c4df07cf359665cee63ea5a0d2a7
Parents: bc72b8b
Author: Alan Conway <[email protected]>
Authored: Tue Sep 8 13:33:57 2015 -0400
Committer: Alan Conway <[email protected]>
Committed: Mon Sep 28 14:08:23 2015 -0400

----------------------------------------------------------------------
 .gitignore                                      |    4 +
 examples/go/CMakeLists.txt                      |   20 +-
 examples/go/README.md                           |   15 +-
 examples/go/broker.go                           |  161 +
 examples/go/event_broker.go                     |  240 --
 examples/go/example_test.go                     |  149 +-
 examples/go/receive.go                          |   93 +-
 examples/go/send.go                             |  105 +-
 examples/go/util/queue.go                       |  106 +
 examples/go/util/util.go                        |   69 +
 proton-c/bindings/go/CMakeLists.txt             |   45 +-
 proton-c/bindings/go/README.md                  |   97 +-
 proton-c/bindings/go/genwrap.go                 |   28 +-
 .../qpid.apache.org/proton/go/amqp.a            | 1726 ---------
 .../qpid.apache.org/proton/go/event.a           | 3318 -----------------
 .../qpid.apache.org/proton/go/internal.a        |  Bin 102074 -> 0 bytes
 .../qpid.apache.org/proton/go/messaging.a       | 1100 ------
 .../qpid.apache.org/proton/go/reactor.a         | 3319 ------------------
 .../go/src/qpid.apache.org/proton/amqp/doc.go   |   34 +
 .../go/src/qpid.apache.org/proton/amqp/error.go |   66 +
 .../go/src/qpid.apache.org/proton/amqp/interop  |    1 +
 .../qpid.apache.org/proton/amqp/interop_test.go |  381 ++
 .../src/qpid.apache.org/proton/amqp/marshal.go  |  250 ++
 .../src/qpid.apache.org/proton/amqp/message.go  |  347 ++
 .../qpid.apache.org/proton/amqp/message_test.go |  166 +
 .../go/src/qpid.apache.org/proton/amqp/types.go |  198 ++
 .../qpid.apache.org/proton/amqp/unmarshal.go    |  556 +++
 .../go/src/qpid.apache.org/proton/amqp/url.go   |   96 +
 .../src/qpid.apache.org/proton/amqp/url_test.go |   51 +
 .../proton/concurrent/connection.go             |  167 +
 .../proton/concurrent/container.go              |   71 +
 .../qpid.apache.org/proton/concurrent/doc.go    |   38 +
 .../proton/concurrent/endpoint.go               |   86 +
 .../proton/concurrent/handler.go                |  137 +
 .../qpid.apache.org/proton/concurrent/link.go   |  232 ++
 .../proton/concurrent/messaging_test.go         |  205 ++
 .../proton/concurrent/receiver.go               |  242 ++
 .../qpid.apache.org/proton/concurrent/sender.go |  190 +
 .../proton/concurrent/session.go                |  115 +
 .../qpid.apache.org/proton/concurrent/time.go   |   71 +
 .../go/src/qpid.apache.org/proton/doc.go        |   46 +
 .../go/src/qpid.apache.org/proton/engine.go     |  402 +++
 .../src/qpid.apache.org/proton/go/amqp/doc.go   |   40 -
 .../src/qpid.apache.org/proton/go/amqp/interop  |    1 -
 .../proton/go/amqp/interop_test.go              |  308 --
 .../qpid.apache.org/proton/go/amqp/marshal.go   |  238 --
 .../qpid.apache.org/proton/go/amqp/message.go   |  342 --
 .../proton/go/amqp/message_test.go              |   90 -
 .../src/qpid.apache.org/proton/go/amqp/types.go |  193 -
 .../src/qpid.apache.org/proton/go/amqp/uid.go   |   40 -
 .../qpid.apache.org/proton/go/amqp/unmarshal.go |  552 ---
 .../src/qpid.apache.org/proton/go/amqp/url.go   |   96 -
 .../qpid.apache.org/proton/go/amqp/url_test.go  |   51 -
 .../src/qpid.apache.org/proton/go/event/doc.go  |   38 -
 .../qpid.apache.org/proton/go/event/handlers.go |  411 ---
 .../qpid.apache.org/proton/go/event/message.go  |   75 -
 .../src/qpid.apache.org/proton/go/event/pump.go |  360 --
 .../qpid.apache.org/proton/go/event/wrappers.go |  253 --
 .../proton/go/event/wrappers_gen.go             |  732 ----
 .../qpid.apache.org/proton/go/internal/error.go |  126 -
 .../qpid.apache.org/proton/go/messaging/doc.go  |   28 -
 .../proton/go/messaging/handler.go              |   70 -
 .../proton/go/messaging/messaging.go            |  261 --
 .../go/src/qpid.apache.org/proton/handlers.go   |  399 +++
 .../qpid.apache.org/proton/internal/error.go    |  121 +
 .../proton/internal/flexchannel.go              |   82 +
 .../proton/internal/flexchannel_test.go         |   89 +
 .../qpid.apache.org/proton/internal/safemap.go  |   57 +
 .../src/qpid.apache.org/proton/internal/uuid.go |   70 +
 .../go/src/qpid.apache.org/proton/message.go    |   79 +
 .../go/src/qpid.apache.org/proton/wrappers.go   |  335 ++
 .../src/qpid.apache.org/proton/wrappers_gen.go  |  874 +++++
 proton-c/include/proton/link.h                  |    2 +-
 73 files changed, 6884 insertions(+), 14272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 4075b3f..ad4eebb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,3 +36,7 @@ proton-c/bindings/go/bin
 
 # Testresults from the jenkins build script
 testresults
+
+# Go binding build output
+/proton-c/bindings/go/pkg
+/proton-c/bindings/go/bin

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
index 464ed7c..873180d 100644
--- a/examples/go/CMakeLists.txt
+++ b/examples/go/CMakeLists.txt
@@ -17,13 +17,21 @@
 # under the License.
 #
 
-# FIXME aconway 2015-05-20:
-# - use proton build for Go includes & libs.
-# - pre-build go libraries? Respect user GOPATH?
-
 if(BUILD_GO)
+  set(examples receive send broker)
+
+  foreach(example ${examples})
+    add_custom_target(go-example-${example} ALL
+      COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o 
${CMAKE_CURRENT_BINARY_DIR}/${example} ${CMAKE_CURRENT_SOURCE_DIR}/${example}.go
+      DEPENDS go-packages qpid-proton)
+  endforeach()
+  add_custom_target(go-example-test ALL
+      COMMAND ${GO_TEST} -c ${GO_EXAMPLE_FLAGS} -o 
${CMAKE_CURRENT_BINARY_DIR}/example_test 
${CMAKE_CURRENT_SOURCE_DIR}/example_test.go
+      DEPENDS go-packages qpid-proton)
+
   add_test(
     NAME go_example_test
-    COMMAND ${GO_TEST} example_test.go -rpath ${CMAKE_BINARY_DIR}/proton-c
-    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+    COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker broker)
+
+  list(APPEND ADDITIONAL_MAKE_CLEAN_FILES ${examples})
 endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index 52bd8e4..33c3d88 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -2,18 +2,17 @@
 
 There are 3 go packages for proton:
 
-- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and 
from Go data types.
-- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for 
messaging clients and servers.
-- qpid.apache.org/proton/go/event: full low-level access to the proton engine.
+- qpid.apache.org/proton/amqp: converts AMQP messages and data types to and 
from Go data types.
+- qpid.apache.org/proton/concurrent: easy-to-use, concurrent API for 
concurrent clients and servers.
+- qpid.apache.org/proton: Low-level access to the proton engine.
 
-Most applications should use the `messaging` package. The `event` package is 
for
+Most applications should use the `proton/concurrent` package. The `proton` 
package is for
 applications that need low-level access to the proton engine.
 
 ## Example programs
 
-- [receive.go](receive.go) receive from many connections concurrently using 
messaging package.
-- [send.go](send.go) send to many connections concurrently using messaging 
package.
-- [event_broker.go](event_broker.go) simple mini-broker using event package.
+- [receive.go](receive.go) receive from many connections concurrently.
+- [send.go](send.go) send to many connections concurrently.
 
 ## Using the Go packages
 
@@ -54,7 +53,7 @@ the example source have more details.
 
 First start the broker:
 
-    go run event_broker.go
+    go run reactor_broker.go
 
 Send messages concurrently to queues "foo" and "bar", 10 messages to each 
queue:
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/broker.go b/examples/go/broker.go
new file mode 100644
index 0000000..3f85e9e
--- /dev/null
+++ b/examples/go/broker.go
@@ -0,0 +1,161 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// This is a simple AMQP broker implemented using the concurrent interface.
+//
+// It maintains a set of named in-memory queues of messages. Clients can send
+// messages to queues or subscribe to receive messages from them.
+//
+//
+
+package main
+
+import (
+       "./util"
+       "flag"
+       "fmt"
+       "log"
+       "net"
+       "os"
+       "qpid.apache.org/proton/concurrent"
+)
+
+// Usage and command-line flags
+func usage() {
+       fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or 
receiver addrsses.
+`, os.Args[0])
+       flag.PrintDefaults()
+}
+
+var addr = flag.String("addr", ":amqp", "Listening address")
+
+func main() {
+       flag.Usage = usage
+       flag.Parse()
+       b := newBroker()
+       err := b.listen(*addr)
+       util.ExitIf(err)
+}
+
+type broker struct {
+       container concurrent.Container
+       queues    util.QueueMap
+}
+
+func newBroker() *broker {
+       return &broker{
+               container: concurrent.NewContainer(""),
+               queues:    util.MakeQueueMap(),
+       }
+}
+
+// Listen for incoming connections
+func (b *broker) listen(addr string) (err error) {
+       listener, err := net.Listen("tcp", addr)
+       if err != nil {
+               return err
+       }
+       log.Printf("Listening on %s\n", listener.Addr())
+       defer listener.Close()
+       for {
+               conn, err := listener.Accept()
+               if err != nil {
+                       return err
+               }
+               c, err := b.container.NewConnection(conn)
+               if err != nil {
+                       return err
+               }
+               // Make this a server connection. Must be done before Open()
+               c.Server() // Server-side protocol negotiation.
+               c.Listen() // Enable remotely-opened endpoints.
+               if err := c.Open(); err != nil {
+                       return err
+               }
+               util.Debugf("accept %s\n", c)
+               // Accept remotely-opened endpoints on the connection
+               go b.accept(c)
+       }
+}
+
+// accept remotely-opened endpoints (Session, Sender and Receiver)
+func (b *broker) accept(c concurrent.Connection) {
+       for ep, err := c.Accept(); err == nil; ep, err = c.Accept() {
+               switch ep := ep.(type) {
+               case concurrent.Session:
+                       util.Debugf("accept session %s\n", ep)
+                       ep.Open()
+               case concurrent.Sender:
+                       util.Debugf("accept sender %s\n", ep)
+                       ep.Open()
+                       go b.sender(ep)
+               case concurrent.Receiver:
+                       util.Debugf("accept receiver %s\n", ep)
+                       ep.SetCapacity(100, true) // Pre-fetch 100 messages
+                       ep.Open()
+                       go b.receiver(ep)
+               }
+       }
+}
+
+// sender pops from a the queue in the sender's Source address and send 
messages.
+func (b *broker) sender(sender concurrent.Sender) {
+       qname := sender.Settings().Source
+       if qname == "" {
+               log.Printf("invalid consumer, no source address: %s", sender)
+               return
+       }
+       q := b.queues.Get(qname)
+       for {
+               m := <-q.Pop
+               if m == nil {
+                       break
+               }
+               if sm, err := sender.Send(m); err == nil {
+                       sm.Forget() // FIXME aconway 2015-09-24: Ignore 
acknowledgements
+                       util.Debugf("send %s: %s\n", sender, 
util.FormatMessage(m))
+               } else {
+                       util.Debugf("send error %s: %s\n", sender, err)
+                       q.Putback <- m
+                       break
+               }
+       }
+}
+
+func (b *broker) receiver(receiver concurrent.Receiver) {
+       qname := receiver.Settings().Target
+       if qname == "" {
+               log.Printf("invalid producer, no target address: %s", receiver)
+               return
+       }
+       q := b.queues.Get(qname)
+       for {
+               if rm, err := receiver.Receive(); err == nil {
+                       util.Debugf("recv %s: %s\n", receiver, 
util.FormatMessage(rm.Message))
+                       q.Push <- rm.Message
+                       rm.Accept()
+               } else {
+                       util.Debugf("recv error %s: %s\n", receiver, err)
+                       break
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/event_broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event_broker.go b/examples/go/event_broker.go
deleted file mode 100644
index 2578bd5..0000000
--- a/examples/go/event_broker.go
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-//
-// This is a simple AMQP broker implemented using the event-handler interface.
-//
-// It maintains a set of named in-memory queues of messages. Clients can send
-// messages to queues or subscribe to receive messages from them.
-//
-//
-
-package main
-
-import (
-       "container/list"
-       "flag"
-       "fmt"
-       "net"
-       "os"
-       "qpid.apache.org/proton/go/amqp"
-       "qpid.apache.org/proton/go/event"
-       "sync"
-)
-
-// Usage and command-line flags
-func usage() {
-       fmt.Fprintf(os.Stderr, `
-Usage: %s
-A simple broker-like demo. Queues are created automatically for sender or 
receiver addrsses.
-`, os.Args[0])
-       flag.PrintDefaults()
-}
-
-var debug = flag.Bool("debug", false, "Print detailed debug output")
-var addr = flag.String("addr", ":amqp", "Listening address")
-var full = flag.Bool("full", false, "Print full message not just body.")
-
-func main() {
-       flag.Usage = usage
-       flag.Parse()
-       b := newBroker()
-       err := b.listen(*addr)
-       exitIf(err)
-}
-
-// broker implements event.MessagingHandler and reacts to events by moving 
messages on or off queues.
-type broker struct {
-       queues map[string]*queue
-       lock   sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker 
coming...
-}
-
-func newBroker() *broker {
-       return &broker{queues: make(map[string]*queue)}
-}
-
-func (b *broker) getQueue(name string) *queue {
-       q := b.queues[name]
-       if q == nil {
-               debugf("Create queue %s\n", name)
-               q = &queue{name, list.New(), make(map[event.Link]bool)}
-               b.queues[name] = q
-       }
-       return q
-}
-
-func (b *broker) unsubscribe(l event.Link) {
-       if l.IsSender() {
-               q := b.queues[l.RemoteSource().Address()]
-               if q != nil {
-                       q.unsubscribe(l)
-                       if q.empty() {
-                               debugf("Delete queue %s\n", q.name)
-                               delete(b.queues, q.name)
-                       }
-               }
-       }
-}
-
-func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e 
event.Event) error {
-       // FIXME aconway 2015-05-04: locking is un-golike, better example 
coming soon.
-       // Needed because the same handler is used for multiple connections 
concurrently
-       // and the queue data structures are not thread safe.
-       b.lock.Lock()
-       defer b.lock.Unlock()
-
-       switch t {
-
-       case event.MLinkOpening:
-               if e.Link().IsSender() {
-                       q := b.getQueue(e.Link().RemoteSource().Address())
-                       q.subscribe(e.Link())
-               }
-
-       case event.MLinkDisconnected, event.MLinkClosing:
-               b.unsubscribe(e.Link())
-
-       case event.MSendable:
-               q := b.getQueue(e.Link().RemoteSource().Address())
-               q.popTo(e.Connection().Pump(), e.Link())
-
-       case event.MMessage:
-               m, err := event.DecodeMessage(e)
-               exitIf(err)
-               qname := e.Link().RemoteTarget().Address()
-               debugf("link %s -> queue %s: %s\n", logLink(e.Link()), qname, 
formatMessage(m))
-               b.getQueue(qname).push(e.Connection().Pump(), m)
-       }
-       return nil
-}
-
-func (b *broker) listen(addr string) (err error) {
-       // Use the standard Go "net" package to listen for connections.
-       listener, err := net.Listen("tcp", addr)
-       if err != nil {
-               return err
-       }
-       fmt.Printf("Listening on %s\n", listener.Addr())
-       defer listener.Close()
-       for {
-               conn, err := listener.Accept()
-               if err != nil {
-                       fmt.Fprintf(os.Stderr, "Accept error: %s\n", err)
-                       continue
-               }
-               pump, err := event.NewPump(conn, event.NewMessagingDelegator(b))
-               exitIf(err)
-               debugf("Accepted %s[%p]\n", pump, pump)
-               pump.Server()
-               go func() {
-                       pump.Run()
-                       if pump.Error == nil {
-                               debugf("Closed %s\n", pump)
-                       } else {
-                               debugf("Closed %s: %s\n", pump, pump.Error)
-                       }
-               }()
-       }
-}
-
-// queue is a structure representing a queue.
-type queue struct {
-       name      string              // Name of queue
-       messages  *list.List          // List of event.Message
-       consumers map[event.Link]bool // Set of consumer links
-}
-
-type logLink event.Link // Wrapper to print links in useful format for logging
-
-func (ll logLink) String() string {
-       l := event.Link(ll)
-       return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
-}
-
-func (q *queue) subscribe(link event.Link) {
-       debugf("link %s subscribed to queue %s\n", logLink(link), q.name)
-       q.consumers[link] = true
-}
-
-func (q *queue) unsubscribe(link event.Link) {
-       debugf("link %s unsubscribed from queue %s\n", logLink(link), q.name)
-       delete(q.consumers, link)
-}
-
-func (q *queue) empty() bool {
-       return len(q.consumers) == 0 && q.messages.Len() == 0
-}
-
-func (q *queue) push(context *event.Pump, message amqp.Message) {
-       q.messages.PushBack(message)
-       q.pop(context)
-}
-
-func (q *queue) popTo(context *event.Pump, link event.Link) bool {
-       if q.messages.Len() != 0 && link.Credit() > 0 {
-               message := q.messages.Remove(q.messages.Front()).(amqp.Message)
-               debugf("link %s <- queue %s: %s\n", logLink(link), q.name, 
formatMessage(message))
-               // The first return parameter is an event.Delivery.
-               // The Deliver can be used to track message status, e.g. so we 
can re-delver on failure.
-               // This demo broker doesn't do that.
-               linkPump := link.Session().Connection().Pump()
-               if context == linkPump {
-                       if context == nil {
-                               exitIf(fmt.Errorf("pop in nil context"))
-                       }
-                       link.Send(message) // link is in the current pump, safe 
to call Send() direct
-               } else {
-                       linkPump.Inject <- func() { // Inject to link's pump
-                               link.Send(message) // FIXME aconway 2015-05-04: 
error handlig
-                       }
-               }
-               return true
-       }
-       return false
-}
-
-func (q *queue) pop(context *event.Pump) (popped bool) {
-       for c, _ := range q.consumers {
-               popped = popped || q.popTo(context, c)
-       }
-       return
-}
-
-// Simple debug logging
-func debugf(format string, data ...interface{}) {
-       if *debug {
-               fmt.Fprintf(os.Stderr, format, data...)
-       }
-}
-
-// Simple error handling for demo.
-func exitIf(err error) {
-       if err != nil {
-               fmt.Fprintln(os.Stderr, err)
-               os.Exit(1)
-       }
-}
-
-func formatMessage(m amqp.Message) string {
-       if *full {
-               return fmt.Sprintf("%#v", m)
-       } else {
-               return fmt.Sprintf("%#v", m.Body())
-       }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index 8879c38..2afd95c 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -28,21 +28,20 @@ import (
        "flag"
        "fmt"
        "io"
-       "io/ioutil"
        "math/rand"
        "net"
        "os"
        "os/exec"
-       "path"
        "path/filepath"
        "reflect"
+       "strings"
        "testing"
        "time"
 )
 
-func panicIf(err error) {
+func fatalIf(t *testing.T, err error) {
        if err != nil {
-               panic(err)
+               t.Fatalf("%s", err)
        }
 }
 
@@ -77,16 +76,18 @@ func (b *broker) check() error {
 }
 
 // Start the demo broker, wait till it is listening on *addr. No-op if already 
started.
-func (b *broker) start() error {
+func (b *broker) start(t *testing.T) error {
        if b.cmd == nil { // Not already started
-               // FIXME aconway 2015-04-30: better way to pick/configure a 
broker port.
                b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000)
-               b.cmd = exampleCommand("event_broker", "-addr", b.addr)
+               b.cmd = exampleCommand(t, *brokerName, "-addr", b.addr)
                b.runerr = make(chan error)
                b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout
-               go func() {
-                       b.runerr <- b.cmd.Run()
-               }()
+               b.err = b.cmd.Start()
+               if b.err == nil {
+                       go func() { b.runerr <- b.cmd.Wait() }()
+               } else {
+                       b.runerr <- b.err
+               }
                b.err = b.check()
        }
        return b.err
@@ -95,7 +96,7 @@ func (b *broker) start() error {
 func (b *broker) stop() {
        if b != nil && b.cmd != nil {
                b.cmd.Process.Kill()
-               b.cmd.Wait()
+               <-b.runerr
        }
 }
 
@@ -106,22 +107,49 @@ func checkEqual(want interface{}, got interface{}) error {
        return fmt.Errorf("%#v != %#v", want, got)
 }
 
-// runCommand returns an exec.Cmd to run an example.
-func exampleCommand(prog string, arg ...string) *exec.Cmd {
-       build(prog + ".go")
+// 'go build' uses the installed copy of the proton Go libraries, which may be 
out of date.
+func checkStaleLibs(t *testing.T) {
+       var stale []string
+       pp := "qpid.apache.org/proton"
+       for _, p := range []string{pp, pp + "/amqp", pp + "/concurrent"} {
+               out, err := exec.Command("go", "list", "-f", "{{.Stale}}", 
p).CombinedOutput()
+               if err != nil {
+                       t.Fatalf("failed to execute 'go list': %v\n%v", err, 
string(out))
+               }
+               if string(out) != "false\n" {
+                       stale = append(stale, pp)
+               }
+       }
+       if len(stale) > 0 {
+               t.Fatalf("Stale libraries, run 'go install %s'", 
strings.Trim(fmt.Sprint(stale), "[]"))
+       }
+}
+
+// exampleCommand returns an exec.Cmd to run an example.
+func exampleCommand(t *testing.T, prog string, arg ...string) (cmd *exec.Cmd) {
+       checkStaleLibs(t)
        args := []string{}
        if *debug {
                args = append(args, "-debug=true")
        }
        args = append(args, arg...)
-       cmd := exec.Command(exepath(prog), args...)
+       prog, err := filepath.Abs(prog)
+       fatalIf(t, err)
+       if _, err := os.Stat(prog); err == nil {
+               cmd = exec.Command(prog, args...)
+       } else if _, err := os.Stat(prog + ".go"); err == nil {
+               args = append([]string{"run", prog + ".go"}, args...)
+               cmd = exec.Command("go", args...)
+       } else {
+               t.Fatalf("Cannot find binary or source for %s", prog)
+       }
        cmd.Stderr = os.Stderr
        return cmd
 }
 
 // Run an example Go program, return the combined output as a string.
-func runExample(prog string, arg ...string) (string, error) {
-       cmd := exampleCommand(prog, arg...)
+func runExample(t *testing.T, prog string, arg ...string) (string, error) {
+       cmd := exampleCommand(t, prog, arg...)
        out, err := cmd.Output()
        return string(out), err
 }
@@ -133,8 +161,8 @@ func prefix(prefix string, err error) error {
        return nil
 }
 
-func runExampleWant(want string, prog string, args ...string) error {
-       out, err := runExample(prog, args...)
+func runExampleWant(t *testing.T, want string, prog string, args ...string) 
error {
+       out, err := runExample(t, prog, args...)
        if err != nil {
                return fmt.Errorf("%s failed: %s: %s", prog, err, out)
        }
@@ -142,7 +170,10 @@ func runExampleWant(want string, prog string, args 
...string) error {
 }
 
 func exampleArgs(args ...string) []string {
-       return append(args, testBroker.addr+"/foo", testBroker.addr+"/bar", 
testBroker.addr+"/baz")
+       for i := 0; i < *connections; i++ {
+               args = append(args, fmt.Sprintf("%s/%s%d", testBroker.addr, 
"q", i))
+       }
+       return args
 }
 
 // Send then receive
@@ -150,18 +181,18 @@ func TestExampleSendReceive(t *testing.T) {
        if testing.Short() {
                t.Skip("Skip demo tests in short mode")
        }
-       testBroker.start()
-       err := runExampleWant(
-               "Received all 15 acknowledgements\n",
+       testBroker.start(t)
+       err := runExampleWant(t,
+               fmt.Sprintf("Received all %d acknowledgements\n", expected),
                "send",
-               exampleArgs("-count", "5")...)
+               exampleArgs("-count", fmt.Sprintf("%d", *count))...)
        if err != nil {
                t.Fatal(err)
        }
-       err = runExampleWant(
-               "Listening on 3 connections\nReceived 15 messages\n",
+       err = runExampleWant(t,
+               fmt.Sprintf("Listening on %v connections\nReceived %v 
messages\n", *connections, *count**connections),
                "receive",
-               exampleArgs("-count", "15")...)
+               exampleArgs("-count", fmt.Sprintf("%d", 
*count**connections))...)
        if err != nil {
                t.Fatal(err)
        }
@@ -175,8 +206,8 @@ func init() { ready = fmt.Errorf("Ready") }
 // Send ready on errchan when it is listening.
 // Send final error when it is done.
 // Returns the Cmd, caller must Wait()
-func goReceiveWant(errchan chan<- error, want string, arg ...string) *exec.Cmd 
{
-       cmd := exampleCommand("receive", arg...)
+func goReceiveWant(t *testing.T, errchan chan<- error, want string, arg 
...string) *exec.Cmd {
+       cmd := exampleCommand(t, "receive", arg...)
        go func() {
                pipe, err := cmd.StdoutPipe()
                if err != nil {
@@ -209,11 +240,12 @@ func TestExampleReceiveSend(t *testing.T) {
        if testing.Short() {
                t.Skip("Skip demo tests in short mode")
        }
-       testBroker.start()
+       testBroker.start(t)
        recvErr := make(chan error)
-       recvCmd := goReceiveWant(recvErr,
-               "Received 15 messages\n",
-               exampleArgs("-count", "15")...)
+       recvCmd := goReceiveWant(
+               t, recvErr,
+               fmt.Sprintf("Received %d messages\n", expected),
+               exampleArgs(fmt.Sprintf("-count=%d", expected))...)
        defer func() {
                recvCmd.Process.Kill()
                recvCmd.Wait()
@@ -221,10 +253,10 @@ func TestExampleReceiveSend(t *testing.T) {
        if err := <-recvErr; err != ready { // Wait for receiver ready
                t.Fatal(err)
        }
-       err := runExampleWant(
-               "Received all 15 acknowledgements\n",
+       err := runExampleWant(t,
+               fmt.Sprintf("Received all %d acknowledgements\n", expected),
                "send",
-               exampleArgs("-count", "5")...)
+               exampleArgs("-count", fmt.Sprintf("%d", *count))...)
        if err != nil {
                t.Fatal(err)
        }
@@ -233,51 +265,18 @@ func TestExampleReceiveSend(t *testing.T) {
        }
 }
 
-func exepath(relative string) string {
-       if binDir == "" {
-               panic("bindir not set, cannot run example binaries")
-       }
-       return path.Join(binDir, relative)
-}
-
 var testBroker *broker
-var binDir, exampleDir string
-var built map[string]bool
-
-func init() {
-       built = make(map[string]bool)
-}
 
-func build(prog string) {
-       if !built[prog] {
-               args := []string{"build"}
-               if *rpath != "" {
-                       args = append(args, "-ldflags", "-r "+*rpath)
-               }
-               args = append(args, path.Join(exampleDir, prog))
-               build := exec.Command("go", args...)
-               build.Dir = binDir
-               out, err := build.CombinedOutput()
-               if err != nil {
-                       panic(fmt.Errorf("%v: %s", err, out))
-               }
-               built[prog] = true
-       }
-}
-
-var rpath = flag.String("rpath", "", "Runtime path for test executables")
 var debug = flag.Bool("debug", false, "Debugging output from examples")
+var brokerName = flag.String("broker", "broker", "Name of broker executable to 
run")
+var count = flag.Int("count", 3, "Count of messages to send in tests")
+var connections = flag.Int("connections", 3, "Number of connections to make in 
tests")
+var expected int
 
 func TestMain(m *testing.M) {
+       expected = (*count) * (*connections)
        rand.Seed(time.Now().UTC().UnixNano())
-       var err error
-       exampleDir, err = filepath.Abs(".")
-       panicIf(err)
-       binDir, err = ioutil.TempDir("", "example_test.go")
-       panicIf(err)
-       defer os.Remove(binDir) // Clean up binaries
-       testBroker = &broker{}  // Broker is started on-demand by tests.
-       testBroker.stop()
+       testBroker = &broker{} // Broker is started on-demand by tests.
        status := m.Run()
        testBroker.stop()
        os.Exit(status)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
index b1eb309..a45ffe3 100644
--- a/examples/go/receive.go
+++ b/examples/go/receive.go
@@ -20,12 +20,14 @@ under the License.
 package main
 
 import (
+       "./util"
        "flag"
        "fmt"
+       "log"
        "net"
        "os"
-       "qpid.apache.org/proton/go/amqp"
-       "qpid.apache.org/proton/go/messaging"
+       "qpid.apache.org/proton/amqp"
+       "qpid.apache.org/proton/concurrent"
        "sync"
 )
 
@@ -37,9 +39,7 @@ Receive messages from all the listed URLs concurrently and 
print them.
        flag.PrintDefaults()
 }
 
-var debug = flag.Bool("debug", false, "Print detailed debug output")
 var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
-var full = flag.Bool("full", false, "Print full message not just body.")
 
 func main() {
        flag.Usage = usage
@@ -47,7 +47,7 @@ func main() {
 
        urls := flag.Args() // Non-flag arguments are URLs to receive from
        if len(urls) == 0 {
-               fmt.Fprintln(os.Stderr, "No URL provided")
+               log.Println("No URL provided")
                usage()
                os.Exit(1)
        }
@@ -57,39 +57,46 @@ func main() {
        var wait sync.WaitGroup             // Used by main() to wait for all 
goroutines to end.
        wait.Add(len(urls))                 // Wait for one goroutine per URL.
 
-       connections := make([]*messaging.Connection, len(urls)) // Store 
connctions to close on exit
+       container := concurrent.NewContainer("")
+       connections := make(chan concurrent.Connection, len(urls)) // 
Connections to close on exit
 
        // Start a goroutine to for each URL to receive messages and send them 
to the messages channel.
        // main() receives and prints them.
-       for i, urlStr := range urls {
-               debugf("debug: Connecting to %s\n", urlStr)
+       for _, urlStr := range urls {
+               util.Debugf("Connecting to %s\n", urlStr)
                go func(urlStr string) { // Start the goroutine
 
                        defer wait.Done()                 // Notify main() when 
this goroutine is done.
                        url, err := amqp.ParseURL(urlStr) // Like 
net/url.Parse() but with AMQP defaults.
-                       exitIf(err)
+                       util.ExitIf(err)
 
-                       // Open a standard Go net.Conn and and AMQP connection 
using it.
+                       // Open a new connection
                        conn, err := net.Dial("tcp", url.Host) // Note 
net.URL.Host is actually "host:port"
-                       exitIf(err)
-                       pc, err := messaging.Connect(conn) // This is our AMQP 
connection.
-                       exitIf(err)
-                       connections[i] = pc // Save connection so it will be 
closed when main() ends
-
-                       // Create a receiver using the path of the URL as the 
AMQP address
-                       r, err := pc.Receiver(url.Path)
-                       exitIf(err)
-
-                       // Loop receiving messages
+                       util.ExitIf(err)
+                       c, err := container.NewConnection(conn)
+                       util.ExitIf(err)
+                       util.ExitIf(c.Open())
+                       connections <- c // Save connection so we can Close() 
when main() ends
+
+                       // Create and open a session
+                       ss, err := c.NewSession()
+                       util.ExitIf(err)
+                       err = ss.Open()
+                       util.ExitIf(err)
+
+                       // Create a Receiver using the path of the URL as the 
source address
+                       r, err := ss.Receiver(url.Path)
+                       util.ExitIf(err)
+
+                       // Loop receiving messages and sending them to the 
main() goroutine
                        for {
-                               var m amqp.Message
-                               select { // Receive a message or stop.
-                               case m = <-r.Receive:
-                               case <-stop: // The program is stopping.
+                               rm, err := r.Receive()
+                               if err == concurrent.Closed {
                                        return
                                }
+                               util.ExitIf(err)
                                select { // Send m to main() or stop
-                               case messages <- m: // Send m to main()
+                               case messages <- rm.Message: // Send to main()
                                case <-stop: // The program is stopping.
                                        return
                                }
@@ -102,37 +109,17 @@ func main() {
 
        // print each message until the count is exceeded.
        for i := uint64(0); i < *count; i++ {
-               debugf("%s\n", formatMessage(<-messages))
+               m := <-messages
+               util.Debugf("%s\n", util.FormatMessage(m))
        }
        fmt.Printf("Received %d messages\n", *count)
+
+       // Close all connections, this will interrupt goroutines blocked in 
Receiver.Receive()
+       for i := 0; i < len(urls); i++ {
+               c := <-connections
+               c.Disconnect(nil) // FIXME aconway 2015-09-25: Close
+       }
        close(stop) // Signal all goroutines to stop.
        wait.Wait() // Wait for all goroutines to finish.
        close(messages)
-       for _, c := range connections { // Close all connections
-               if c != nil {
-                       c.Close()
-               }
-       }
-}
-
-// Simple debug logging
-func debugf(format string, data ...interface{}) {
-       if *debug {
-               fmt.Fprintf(os.Stderr, format, data...)
-       }
-}
-
-// Simple error handling for demo.
-func exitIf(err error) {
-       if err != nil {
-               fmt.Fprintln(os.Stderr, err)
-       }
-}
-
-func formatMessage(m amqp.Message) string {
-       if *full {
-               return fmt.Sprintf("%#v", m)
-       } else {
-               return fmt.Sprintf("%#v", m.Body())
-       }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
index 98acefa..7fa5416 100644
--- a/examples/go/send.go
+++ b/examples/go/send.go
@@ -20,12 +20,14 @@ under the License.
 package main
 
 import (
+       "./util"
        "flag"
        "fmt"
+       "log"
        "net"
        "os"
-       "qpid.apache.org/proton/go/amqp"
-       "qpid.apache.org/proton/go/messaging"
+       "qpid.apache.org/proton/amqp"
+       "qpid.apache.org/proton/concurrent"
        "sync"
 )
 
@@ -37,13 +39,11 @@ Send messages to each URL concurrently with body 
"<url-path>-<n>" where n is the
        flag.PrintDefaults()
 }
 
-var debug = flag.Bool("debug", false, "Print detailed debug output")
 var count = flag.Int64("count", 1, "Send this may messages per address.")
 
-// Ack associates an info string with an acknowledgement
-type Ack struct {
-       ack  messaging.Acknowledgement
-       info string
+type sent struct {
+       name        string
+       sentMessage concurrent.SentMessage
 }
 
 func main() {
@@ -52,63 +52,68 @@ func main() {
 
        urls := flag.Args() // Non-flag arguments are URLs to receive from
        if len(urls) == 0 {
-               fmt.Fprintln(os.Stderr, "No URL provided")
+               log.Println("No URL provided")
                flag.Usage()
                os.Exit(1)
        }
 
-       acks := make(chan Ack)  // Channel to receive all the acknowledgements
-       var wait sync.WaitGroup // Used by main() to wait for all goroutines to 
end.
-       wait.Add(len(urls))     // Wait for one goroutine per URL.
+       sentChan := make(chan sent) // Channel to receive all the delivery 
receipts.
+       var wait sync.WaitGroup     // Used by main() to wait for all 
goroutines to end.
+       wait.Add(len(urls))         // Wait for one goroutine per URL.
 
-       connections := make([]*messaging.Connection, len(urls)) // Store 
connctions to close on exit
+       container := concurrent.NewContainer("")
+       var connections []concurrent.Connection // Store connctions to close on 
exit
 
-       // Start a goroutine for each URL to send messages, receive the 
acknowledgements and
-       // send them to the acks channel.
-       for i, urlStr := range urls {
-               debugf("Connecting to %v\n", urlStr)
+       // Start a goroutine for each URL to send messages.
+       for _, urlStr := range urls {
+               util.Debugf("Connecting to %v\n", urlStr)
                go func(urlStr string) {
 
                        defer wait.Done()                 // Notify main() that 
this goroutine is done.
                        url, err := amqp.ParseURL(urlStr) // Like 
net/url.Parse() but with AMQP defaults.
-                       exitIf(err)
+                       util.ExitIf(err)
 
-                       // Open a standard Go net.Conn and and AMQP connection 
using it.
+                       // Open a new connection
                        conn, err := net.Dial("tcp", url.Host) // Note 
net.URL.Host is actually "host:port"
-                       exitIf(err)
-                       pc, err := messaging.Connect(conn) // This is our AMQP 
connection.
-                       exitIf(err)
-                       connections[i] = pc // Save connection so it will be 
closed when main() ends
-
-                       // Create a sender using the path of the URL as the 
AMQP address
-                       s, err := pc.Sender(url.Path)
-                       exitIf(err)
-
-                       // Loop sending messages, receiving acknowledgements 
and sending them to the acks channel.
+                       util.ExitIf(err)
+                       c, err := container.NewConnection(conn)
+                       util.ExitIf(err)
+                       err = c.Open()
+                       util.ExitIf(err)
+                       connections = append(connections, c) // Save connection 
so it will be closed when main() ends
+
+                       // Create and open a session
+                       ss, err := c.NewSession()
+                       util.ExitIf(err)
+                       err = ss.Open()
+                       util.ExitIf(err)
+
+                       // Create a Sender using the path of the URL as the 
AMQP address
+                       s, err := ss.Sender(url.Path)
+                       util.ExitIf(err)
+
+                       // Loop sending messages.
                        for i := int64(0); i < *count; i++ {
                                m := amqp.NewMessage()
                                body := fmt.Sprintf("%v-%v", url.Path, i)
-                               m.SetBody(body)
-                               // Note Send is *asynchronous*, ack is a 
channel that will receive the acknowledgement.
-                               ack, err := s.Send(m)
-                               exitIf(err)
-                               acks <- Ack{ack, body} // Send the 
acknowledgement to main()
+                               m.Marshal(body)
+                               sentMessage, err := s.Send(m)
+                               util.ExitIf(err)
+                               sentChan <- sent{body, sentMessage}
                        }
                }(urlStr)
        }
 
        // Wait for all the acknowledgements
        expect := int(*count) * len(urls)
-       debugf("Started senders, expect %v acknowledgements\n", expect)
+       util.Debugf("Started senders, expect %v acknowledgements\n", expect)
        for i := 0; i < expect; i++ {
-               ack, ok := <-acks
-               if !ok {
-                       exitIf(fmt.Errorf("acks channel closed after only %d 
acks\n", i))
-               }
-               d := <-ack.ack
-               debugf("acknowledgement[%v] %v\n", i, ack.info)
-               if d != messaging.Accepted {
-                       fmt.Printf("Unexpected disposition %v\n", d)
+               d := <-sentChan
+               disposition, err := d.sentMessage.Disposition()
+               if err != nil {
+                       util.Debugf("acknowledgement[%v] %v error: %v\n", i, 
d.name, err)
+               } else {
+                       util.Debugf("acknowledgement[%v]  %v (%v)\n", i, 
d.name, disposition)
                }
        }
        fmt.Printf("Received all %v acknowledgements\n", expect)
@@ -116,21 +121,7 @@ func main() {
        wait.Wait()                     // Wait for all goroutines to finish.
        for _, c := range connections { // Close all connections
                if c != nil {
-                       c.Close()
+                       c.Close(nil)
                }
        }
 }
-
-// Simple debug logging
-func debugf(format string, data ...interface{}) {
-       if *debug {
-               fmt.Fprintf(os.Stderr, format, data...)
-       }
-}
-
-// Simple error handling for demo.
-func exitIf(err error) {
-       if err != nil {
-               fmt.Fprintln(os.Stderr, err)
-       }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
new file mode 100644
index 0000000..075c4d2
--- /dev/null
+++ b/examples/go/util/queue.go
@@ -0,0 +1,106 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package util
+
+import (
+       "container/list"
+       "qpid.apache.org/proton/amqp"
+       "sync"
+)
+
+// Queue is a concurrent-safe queue of amqp.Message.
+type Queue struct {
+       name     string
+       messages list.List // List of amqp.Message
+       // Send to Push to push a message onto back of queue
+       Push chan amqp.Message
+       // Receive from Pop to pop a message from the front of the queue.
+       Pop chan amqp.Message
+       // Send to Putback to put an unsent message back on the front of the 
queue.
+       Putback chan amqp.Message
+}
+
+func NewQueue(name string) *Queue {
+       q := &Queue{
+               name:    name,
+               Push:    make(chan amqp.Message),
+               Pop:     make(chan amqp.Message),
+               Putback: make(chan amqp.Message),
+       }
+       go q.run()
+       return q
+}
+
+// Close the queue. Any remaining messages on Pop can still be received.
+func (q *Queue) Close() { close(q.Push); close(q.Putback) }
+
+// Run runs the queue, returns when q.Close() is called.
+func (q *Queue) run() {
+       defer close(q.Pop)
+       for {
+               var pop chan amqp.Message
+               var front amqp.Message
+               if el := q.messages.Front(); el != nil {
+                       front = el.Value.(amqp.Message)
+                       pop = q.Pop // Only select for pop if there is 
something to pop.
+               }
+               select {
+               case m, ok := <-q.Push:
+                       if !ok {
+                               return
+                       }
+                       Debugf("%s push: %s\n", q.name, FormatMessage(m))
+                       q.messages.PushBack(m)
+               case m, ok := <-q.Putback:
+                       Debugf("%s put-back: %s\n", q.name, FormatMessage(m))
+                       if !ok {
+                               return
+                       }
+                       q.messages.PushFront(m)
+               case pop <- front:
+                       Debugf("%s pop: %s\n", q.name, FormatMessage(front))
+                       q.messages.Remove(q.messages.Front())
+               }
+       }
+}
+
+// QueueMap is a concurrent-safe map of queues that creates new queues
+// on demand.
+type QueueMap struct {
+       lock sync.Mutex
+       m    map[string]*Queue
+}
+
+func MakeQueueMap() QueueMap { return QueueMap{m: make(map[string]*Queue)} }
+
+func (qm *QueueMap) Get(name string) *Queue {
+       if name == "" {
+               panic("Attempt to get queue with no name")
+       }
+       qm.lock.Lock()
+       defer qm.lock.Unlock()
+       q := qm.m[name]
+       if q == nil {
+               q = NewQueue(name)
+               qm.m[name] = q
+               Debugf("queue %s create", name)
+       }
+       return q
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/util/util.go
----------------------------------------------------------------------
diff --git a/examples/go/util/util.go b/examples/go/util/util.go
new file mode 100644
index 0000000..72c6646
--- /dev/null
+++ b/examples/go/util/util.go
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// util contains utility types and functions to simplify parts of the example
+// code that are not related to the use of proton.
+package util
+
+import (
+       "flag"
+       "fmt"
+       "log"
+       "os"
+       "path"
+       "qpid.apache.org/proton/amqp"
+)
+
+// Debug flag "-debug" enables debug output with Debugf
+var Debug = flag.Bool("debug", false, "Print detailed debug output")
+
+// Full flag "-full" enables full message output by FormatMessage
+var Full = flag.Bool("full", false, "Print full message not just body.")
+
+// Debugf logs debug messages if "-debug" flag is set.
+func Debugf(format string, data ...interface{}) {
+       if *Debug {
+               log.Printf(format, data...)
+       }
+}
+
+// Simple error handling for demo.
+func ExitIf(err error) {
+       if err != nil {
+               log.Println(err)
+               os.Exit(1)
+       }
+}
+
+// FormatMessage formats a message as a string, just the body by default or
+// the full message (with properties etc.) if "-full" flag is set.
+func FormatMessage(m amqp.Message) string {
+       if *Full {
+               return fmt.Sprintf("%#v", m)
+       } else {
+               return fmt.Sprintf("%#v", m.Body())
+       }
+}
+
+// For example programs, use the program name as the log prefix.
+func init() {
+       log.SetFlags(0)
+       _, prog := path.Split(os.Args[0])
+       log.SetPrefix(prog + ": ")
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/CMakeLists.txt 
b/proton-c/bindings/go/CMakeLists.txt
index ea6238f..0631eae 100644
--- a/proton-c/bindings/go/CMakeLists.txt
+++ b/proton-c/bindings/go/CMakeLists.txt
@@ -17,32 +17,49 @@
 # under the License.
 #
 
-# FIXME aconway 2015-05-20: install targets for go source.
-
 set(GO_BUILD_FLAGS "" CACHE STRING "Flags for 'go build'")
-set(GO_TEST_FLAGS "-v" CACHE STRING "Flags for 'go test'")
+set(GO_TEST_FLAGS "-v -race" CACHE STRING "Flags for 'go test'")
 
 separate_arguments(GO_BUILD_FLAGS)
 separate_arguments(GO_TEST_FLAGS)
-list(APPEND GO_BUILD_FLAGS "-ldflags=-r ${CMAKE_BINARY_DIR}/proton-c")
 
 if (BUILD_GO)
-  # Build in the source tree, go tools aren't friendly otherwise.
-  # All build output goes in git-ignored pkg or bin subdirectories.
-  set(qgo "qpid.apache.org/proton/go")
-  set(packages ${qgo}/amqp ${qgo}/event ${qgo}/messaging)
-
   # Following are CACHE INTERNAL so examples/CMakeLists.txt can see them.
   set(GO_ENV ${env_py} --
     "GOPATH=${CMAKE_CURRENT_SOURCE_DIR}"
     "CGO_CFLAGS=-I${CMAKE_SOURCE_DIR}/proton-c/include"
     "CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/proton-c"
-    ${GO_EXE} CACHE INTERNAL "Run go with environment set"
-    )
-  set(GO_BUILD ${GO_ENV} build ${GO_BUILD_FLAGS} CACHE INTERNAL "Run go build")
-  set(GO_TEST ${GO_ENV} test ${GO_BUILD_FLAGS} ${GO_TEST_FLAGS} CACHE INTERNAL 
"Run go test")
+    ${GO_EXE} CACHE INTERNAL "Run go with environment set")
+
+  # Set rpath so test and example executables will use the proton library from 
this build.
+  execute_process(COMMAND ${GO_EXE} version OUTPUT_VARIABLE go_out)
+  if (go_out MATCHES "gccgo")
+    set(GO_RPATH_FLAGS -gccgoflags "-Wl,-rpath=${CMAKE_BINARY_DIR}/proton-c")
+  else()
+    set(GO_RPATH_FLAGS -ldflags "-r ${CMAKE_BINARY_DIR}/proton-c")
+  endif()
+
+  set(GO_BUILD ${GO_ENV} build ${GO_BUILD_FLAGS} ${GO_RPATH_FLAGS} CACHE 
INTERNAL "Run go build")
+  set(GO_INSTALL ${GO_ENV} install ${GO_BUILD_FLAGS} CACHE INTERNAL "Run go 
install")
+  set(GO_TEST ${GO_ENV} test ${GO_BUILD_FLAGS} ${GO_RPATH_FLAGS} 
${GO_TEST_FLAGS} CACHE INTERNAL "Run go test")
+
+  # Install packages in the source tree, go tools aren't friendly otherwise.
+  # All build output goes in git-ignored pkg or bin subdirectories.
+  set(qgo "qpid.apache.org/proton")
+  set(packages ${qgo} ${qgo}/amqp ${qgo}/concurrent ${qgo}/internal)
+  add_custom_target(go-packages ALL
+    COMMAND ${GO_INSTALL} ${packages}
+    WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
+    DEPENDS qpid-proton)
+
+  add_test(
+    NAME go_test_packages
+    COMMAND ${GO_TEST} ${packages}
+    WORKING_DIRECTORY "${CMAKE_BINARY_DIR}")
 
-  add_test(NAME go_package_test COMMAND ${GO_TEST} ${packages})
+  list(APPEND ADDITIONAL_MAKE_CLEAN_FILES
+    ${CMAKE_CURRENT_SOURCE_DIR}/pkg
+    ${CMAKE_CURRENT_SOURCE_DIR}/bin)
 
   set (GO_INSTALL_DIR ${SHARE_INSTALL_DIR}/gocode/src CACHE PATH "Installation 
directory for Go code")
   mark_as_advanced (GO_INSTALL_DIR)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md
index 0f16b8e..98a432a 100644
--- a/proton-c/bindings/go/README.md
+++ b/proton-c/bindings/go/README.md
@@ -8,7 +8,7 @@ Feedback is strongly encouraged:
 - Email <[email protected]>
 - Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches 
to an issue.
 
-The package documentation is available at: 
<http://godoc.org/qpid.apache.org/proton/go>
+The package documentation is available at: 
<http://godoc.org/qpid.apache.org/proton>
 
 See the [examples](../../../examples/go/README.md) for working examples and
 practical instructions on how to get started.
@@ -38,40 +38,32 @@ There are two types of developer we want to support
 
 There are 3 go packages for proton:
 
-- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and 
from Go data types.
-- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for 
messaging clients and servers.
-- qpid.apache.org/proton/go/event: full low-level access to the proton engine.
+- qpid.apache.org/proton/amqp: converts AMQP messages and data types to and 
from Go data types.
+- qpid.apache.org/proton/concurrent: easy-to-use, concurrent API for clients 
and servers.
+- qpid.apache.org/proton: full low-level access to the proton engine.
 
-Most applications should use the `messaging` package. The `event` package is 
for
-applications that need low-level access to the proton engine.
+The `amqp` package provides conversions between AMQP and Go data types that are
+used by the other two packages.
 
-The `event` package is fairly complete, with the exception of the proton
-reactor. It's unclear if the reactor is important for go.
+The `concurrent` package provides a simple procedural API that can be used with
+goroutines to construct concurrent AMQP clients and servers.
 
-The `messaging` package can run the examples but probably not much else. There
-is work to do on error handling and the API may change.
+The `proton` package is a concurrency-unsafe, event-driven API. It is a very
+thin wrapper providing almost direct access to the underlying proton C API.
 
-There are working [examples](../../../examples/go/README.md) of a broker using 
`event` and
-a sender and receiver using `messaging`.
+The `concurrent` package will probably be more familiar and convenient to Go
+programmers for most use cases. The `proton` package may be more familiar if
+you have used proton in other languages.
 
-## The event driven API
+Note the `concurrent` package itself is implemented in terms of the `proton`
+package. It takes care of running concurrency-unsafe `proton` code in dedicated
+goroutines and setting up channels to move data between user and proton
+goroutines safely. It hides all this complexity behind a simple procedural
+interface rather than presenting an event-driven interface.
 
-See the package documentation for details.
+See the [examples](../../../examples/go/README.md) for a better illustration 
of the APIs.
 
-## The Go API
-
-The goal: A procedural API that allows any user goroutine to send and receive
-AMQP messages and other information (acknowledgments, flow control instructions
-etc.) using channels. There will be no user-visible locks and no need to run
-user code in special goroutines, e.g. as handlers in a proton event loop.
-
-See the package documentation for emerging details.
-
-Currently using a channel to receive messages, a function to send them 
(channels
-internally) and a channel as a "future" for acknowledgements to senders. This
-may change.
-
-## Why a separate API for Go?
+### Why two APIs?
 
 Go is a concurrent language and encourages applications to be divided into
 concurrent *goroutines*. It provides traditional locking but it encourages the
@@ -80,8 +72,7 @@ use *channels* to communicate between goroutines without 
explicit locks:
   "Share memory by communicating, don't communicate by sharing memory"
 
 The idea is that a given value is only operated on by one goroutine at a time,
-but values can easily be passed from one goroutine to another. This removes 
much
-of the need for locking.
+but values can easily be passed from one goroutine to another.
 
 Go literature distinguishes between:
 
@@ -97,8 +88,8 @@ respect to events like file descriptors being 
readable/writable, channels having
 data, timers firing etc. Go automatically takes care of switching out 
goroutines
 that block or sleep so it is normal to write code in terms of blocking calls.
 
-Event-driven API (like poll, epoll, select or the proton event API) also
-channel unpredictably ordered events to actions in one or a small pool of
+Event-driven programming (such as poll, epoll, select or the `proton` package)
+also channels unpredictably ordered events to actions in one or a small pool of
 execution threads. However this requires a different style of programming:
 "event-driven" or "reactive" programming. Go developers call it "inside-out"
 programming. In an event-driven architecture blocking is a big problem as it
@@ -121,15 +112,39 @@ preserved across blocking calls. There's no need to store 
details in context
 objects that you have to look up when handling a later event to figure out how
 to continue where you left off.
 
-So a Go-like proton API does not force the users code to run in an event-loop
-goroutine. Instead user goroutines communicate with the event loop(s) via
-channels.  There's no need to funnel connections into one event loop, in fact 
it
-makes no sense.  Connections can be processed concurrently so they should be
-processed in separate goroutines and left to Go to schedule. User goroutines 
can
-have simple loops that block channels till messages are available, the user can
-start as many or as few such goroutines as they wish to implement concurrency 
as
-simple or as complex as they wish. For example blocking request-response
-vs. asynchronous flows of messages and acknowledgments.
+The `proton` API is important because it is close to the original proton-C
+reactive API and gives you direct access to the underlying library. However it
+is un-Go-like in it's event-driven nature, and it requires care as methods on
+values associated with the same underlying proton engine are not
+concurrent-safe.
+
+The `concurrent` API hides the event-driven details behind a simple blocking 
API
+that can be safely called from arbitrary goroutines. Under the covers data is
+passed through channels to dedicated goroutines running separate `proton` event
+loops for each connection.
+
+### Design of the concurrent API
+
+The details are still being worked out (see the code) but some basic 
principles have been
+established.
+
+Code from the `proton` package runs _only_ in a dedicated goroutine (per
+connection). This makes it safe to use proton C data structures associated with
+that connection.
+
+Code in the `concurrent` package can run in any goroutine, and holds `proton`
+package values with proton object pointers.  To use those values, it "injects" 
a
+function into the proton goroutine via a special channel. Injected functions
+can use temporary channels to allow the calling code to wait for results. Such
+waiting is only for the local event-loop, not across network calls.
+
+The API exposes blocking calls returning normal error values, no exposed
+channels or callbacks. The user can write simple blocking code or start their
+own goroutine loops and channels as appropriate. Details of our internal 
channel
+use and error handling are hidden, which simplifies the API and gives us more
+implementation flexibility.
+
+TODO: lifecycle rules for proton objects.
 
 ## New to Go?
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go
index 27e5966..dd2780c 100644
--- a/proton-c/bindings/go/genwrap.go
+++ b/proton-c/bindings/go/genwrap.go
@@ -36,7 +36,7 @@ import (
 )
 
 var includeProton = "../../include/proton"
-var outpath = "src/qpid.apache.org/proton/go/event/wrappers_gen.go"
+var outpath = "src/qpid.apache.org/proton/wrappers_gen.go"
 
 func main() {
        flag.Parse()
@@ -44,15 +44,15 @@ func main() {
        panicIf(err)
        defer out.Close()
 
-       apis := []string{"session", "link", "delivery", "disposition", 
"condition", "terminus", "connection"}
+       apis := []string{"session", "link", "delivery", "disposition", 
"condition", "terminus", "connection", "transport"}
        fmt.Fprintln(out, copyright)
        fmt.Fprint(out, `
-package event
+package proton
 
 import (
        "time"
   "unsafe"
-  "qpid.apache.org/proton/go/internal"
+  "qpid.apache.org/proton/internal"
 )
 
 // #include <proton/types.h>
@@ -122,10 +122,11 @@ func findEnums(header string) (enums []enumType) {
 }
 
 func genEnum(out io.Writer, name string, values []string) {
-       doTemplate(out, []interface{}{name, values}, `{{$enumName := index . 
0}}{{$values := index . 1}}
+       doTemplate(out, []interface{}{name, values}, `
+{{$enumName := index . 0}}{{$values := index . 1}}
 type {{mixedCase $enumName}} C.pn_{{$enumName}}_t
 const ({{range $values}}
-       {{mixedCase .}} {{mixedCase $enumName}} = C.{{.}} {{end}}
+       {{mixedCaseTrim . "PN_"}} {{mixedCase $enumName}} = C.{{.}} {{end}}
 )
 
 func (e {{mixedCase $enumName}}) String() string {
@@ -177,7 +178,8 @@ under the License.
 */
 
 //
-// NOTE: This file was generated by genwrap.go, do not edit it by hand.
+// NOTE: DO NOT EDIT. This file was generated by genwrap.go from the proton 
header files.
+// Update the generator and re-run if you need to modify this code.
 //
 `
 
@@ -199,7 +201,7 @@ var (
        enumDefRe   = regexp.MustCompile("typedef enum {([^}]*)} 
pn_([a-z_]+)_t;")
        enumValRe   = regexp.MustCompile("PN_[A-Z_]+")
        skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER")
-       skipFnRe    = 
regexp.MustCompile("attach|context|class|collect|^recv$|^send$|transport")
+       skipFnRe    = 
regexp.MustCompile("attach|context|class|collect|link_recv|link_send|transport_.*logf$|transport_.*trace|transport_head|transport_push")
 )
 
 // Generate event wrappers.
@@ -311,6 +313,13 @@ func mapType(ctype string) (g genType) {
        case "C.pn_seconds_t":
                g.Gotype = "time.Duration"
                g.ToGo = func(v string) string { return 
fmt.Sprintf("(time.Duration(%s) * time.Second)", v) }
+       case "C.pn_millis_t":
+               g.Gotype = "time.Duration"
+               g.ToGo = func(v string) string { return 
fmt.Sprintf("(time.Duration(%s) * time.Millisecond)", v) }
+       case "C.pn_timestamp_t":
+               g.Gotype = "time.Time"
+               g.ToC = func(v string) string { return 
fmt.Sprintf("pnTime(%s)", v) }
+               g.ToGo = func(v string) string { return 
fmt.Sprintf("goTime(%s)", v) }
        case "C.pn_error_t *":
                g.Gotype = "error"
                g.ToGo = func(v string) string { return 
fmt.Sprintf("internal.PnError(unsafe.Pointer(%s))", v) }
@@ -396,7 +405,7 @@ func cAssigns(args []genArg) string {
 // Return the go name of the function or "" to skip the function.
 func goFnName(api, fname string) string {
        // Skip class, context and attachment functions.
-       if skipFnRe.FindStringSubmatch(fname) != nil {
+       if skipFnRe.FindStringSubmatch(api+"_"+fname) != nil {
                return ""
        }
        switch api + "." + fname {
@@ -410,6 +419,7 @@ func goFnName(api, fname string) string {
 func apiWrapFns(api, header string, out io.Writer) {
        fmt.Fprintf(out, "type %s struct{pn *C.pn_%s_t}\n", mixedCase(api), api)
        fmt.Fprintf(out, "func (%c %s) IsNil() bool { return %c.pn == nil }\n", 
api[0], mixedCase(api), api[0])
+       fmt.Fprintf(out, "func (%c %s) CPtr() unsafe.Pointer { return 
unsafe.Pointer(%c.pn) }\n", api[0], mixedCase(api), api[0])
        fn := regexp.MustCompile(fmt.Sprintf(`PN_EXTERN ([a-z0-9_ ]+ *\*?) 
*pn_%s_([a-z_]+)\(pn_%s_t *\*[a-z_]+ *,? *([^)]*)\)`, api, api))
        for _, m := range fn.FindAllStringSubmatch(header, -1) {
                rtype, fname, argstr := mapType(m[1]), m[2], m[3]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to