PROTON-827: Re-create go worspace, cmake support for testing.

Re-created the go workspace structure in the repository so it can be set as a 
GOPATH element.
ctest runs package tests and examples.
make install go code to share/gocode
config.sh sets env. vars for CGO compilation.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c9257f47
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c9257f47
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c9257f47

Branch: refs/heads/go1
Commit: c9257f470bda6ce30e0c985cc35a166141dfe0c4
Parents: 5ea911e
Author: Alan Conway <[email protected]>
Authored: Wed May 20 15:30:38 2015 -0400
Committer: Alan Conway <[email protected]>
Committed: Fri May 22 15:38:27 2015 -0400

----------------------------------------------------------------------
 CMakeLists.txt                                  |   1 +
 config.sh.in                                    |  23 +-
 examples/CMakeLists.txt                         |   3 +-
 examples/go/CMakeLists.txt                      |  29 +
 examples/go/README.md                           |  72 +-
 examples/go/event/broker.go                     | 255 -------
 examples/go/event_broker.go                     | 255 +++++++
 examples/go/example_test.go                     |  14 +-
 go                                              |   1 +
 go/README.md                                    | 138 ----
 go/amqp/doc.go                                  |  40 -
 go/amqp/interop                                 |   1 -
 go/amqp/interop_test.go                         | 308 --------
 go/amqp/marshal.go                              | 238 ------
 go/amqp/message.go                              | 342 ---------
 go/amqp/message_test.go                         |  90 ---
 go/amqp/types.go                                | 193 -----
 go/amqp/uid.go                                  |  40 -
 go/amqp/unmarshal.go                            | 552 --------------
 go/amqp/url.go                                  |  96 ---
 go/amqp/url_test.go                             |  51 --
 go/event/doc.go                                 |  38 -
 go/event/genwrap.go                             | 427 -----------
 go/event/handlers.go                            | 411 -----------
 go/event/message.go                             |  75 --
 go/event/pump.go                                | 357 ---------
 go/event/wrappers.go                            | 253 -------
 go/event/wrappers_gen.go                        | 732 -------------------
 go/internal/error.go                            | 125 ----
 go/messaging/doc.go                             |  28 -
 go/messaging/handler.go                         |  70 --
 go/messaging/messaging.go                       | 250 -------
 proton-c/CMakeLists.txt                         |   3 -
 proton-c/bindings/CMakeLists.txt                |   8 +-
 proton-c/bindings/go                            |   1 -
 proton-c/bindings/go/CMakeLists.txt             |  51 ++
 proton-c/bindings/go/README.md                  | 137 ++++
 proton-c/bindings/go/genwrap.go                 | 427 +++++++++++
 .../src/qpid.apache.org/proton/go/amqp/doc.go   |  40 +
 .../src/qpid.apache.org/proton/go/amqp/interop  |   1 +
 .../proton/go/amqp/interop_test.go              | 308 ++++++++
 .../qpid.apache.org/proton/go/amqp/marshal.go   | 238 ++++++
 .../qpid.apache.org/proton/go/amqp/message.go   | 342 +++++++++
 .../proton/go/amqp/message_test.go              |  90 +++
 .../src/qpid.apache.org/proton/go/amqp/types.go | 193 +++++
 .../src/qpid.apache.org/proton/go/amqp/uid.go   |  40 +
 .../qpid.apache.org/proton/go/amqp/unmarshal.go | 552 ++++++++++++++
 .../src/qpid.apache.org/proton/go/amqp/url.go   |  96 +++
 .../qpid.apache.org/proton/go/amqp/url_test.go  |  51 ++
 .../src/qpid.apache.org/proton/go/event/doc.go  |  38 +
 .../qpid.apache.org/proton/go/event/handlers.go | 411 +++++++++++
 .../qpid.apache.org/proton/go/event/message.go  |  75 ++
 .../src/qpid.apache.org/proton/go/event/pump.go | 357 +++++++++
 .../qpid.apache.org/proton/go/event/wrappers.go | 253 +++++++
 .../proton/go/event/wrappers_gen.go             | 732 +++++++++++++++++++
 .../qpid.apache.org/proton/go/internal/error.go | 125 ++++
 .../qpid.apache.org/proton/go/messaging/doc.go  |  28 +
 .../proton/go/messaging/handler.go              |  70 ++
 .../proton/go/messaging/messaging.go            | 250 +++++++
 59 files changed, 5268 insertions(+), 5157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2df2dfb..8790c57 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -141,6 +141,7 @@ if (BUILD_JAVA)
 endif()
 
 add_subdirectory(proton-c)
+add_subdirectory(examples)
 
 install (FILES LICENSE README.md TODO
          DESTINATION ${PROTON_SHARE})

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 4b60b2f..3ad8ba8 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -18,6 +18,16 @@
 # under the License.
 #
 
+merge_paths() {
+    # Merge paths, remove duplicates (keep first instance)
+    path=$(echo $* | sed 's/:/ /'g) # Split with spaces.
+    newpath=""
+    for d in $path; do         # Remove duplicates
+       { echo $newpath | grep -q "\(:\|^\)$d\(:\|$\)"; } || 
newpath="$newpath:$d"
+    done
+    echo $newpath | sed 's/^://' # Remove leading :
+}
+
 PROTON_HOME=@CMAKE_SOURCE_DIR@
 PROTON_BUILD=@CMAKE_BINARY_DIR@
 
@@ -50,10 +60,17 @@ export 
RUBYLIB=$RUBY_BINDINGS:$PROTON_HOME/proton-c/bindings/ruby/lib:$PROTON_HO
 # Perl
 export 
PERL5LIB=$PERL5LIB:$PERL_BINDINGS:$PROTON_HOME/proton-c/bindings/perl/lib
 
+# Go
+export GOPATH="$(merge_paths $PROTON_HOME/proton-c/bindings/go $GOPATH)"
+# Help Go compiler find libraries and include files.
+export C_INCLUDE_PATH="$(merge_paths $PROTON_HOME/proton-c/include 
$PROTON_BUILD/proton-c/include $C_INCLUDE_PATH)"
+export LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LIBRARY_PATH)"
+export LD_LIBRARY_PATH="$(merge_paths $PROTON_BUILD/proton-c $LD_LIBRARY_PATH)"
+
+
+
 # test applications
-export PATH="$PATH:$PROTON_BUILD/tests/tools/apps/c"
-export PATH="$PATH:$PROTON_HOME/tests/tools/apps/python"
-export PATH="$PATH:$PROTON_HOME/tests/python"
+export PATH="$(merge_paths $PATH $PROTON_BUILD/tests/tools/apps/c 
$PROTON_HOME/tests/tools/apps/python $PROTON_HOME/tests/python)"
 
 # can the test harness use valgrind?
 if [[ -x "$(type -p valgrind)" ]] ; then

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index feac758..21878eb 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -18,6 +18,7 @@
 #
 
 set (Proton_DIR ${CMAKE_CURRENT_SOURCE_DIR})
-
 include_directories(${CMAKE_CURRENT_SOURCE_DIR}/include)
+
 add_subdirectory(c/messenger)
+add_subdirectory(go)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
new file mode 100644
index 0000000..464ed7c
--- /dev/null
+++ b/examples/go/CMakeLists.txt
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# FIXME aconway 2015-05-20:
+# - use proton build for Go includes & libs.
+# - pre-build go libraries? Respect user GOPATH?
+
+if(BUILD_GO)
+  add_test(
+    NAME go_example_test
+    COMMAND ${GO_TEST} example_test.go -rpath ${CMAKE_BINARY_DIR}/proton-c
+    WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
+endif()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index c81e8d3..9d0d738 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -9,48 +9,33 @@ There are 3 go packages for proton:
 Most applications should use the `messaging` package. The `event` package is 
for
 applications that need low-level access to the proton engine.
 
-## messaging examples
+## Example programs
 
-- [receive.go](receive.go) receive from many connections concurrently.
-- [send.go](send.go) send to many connections concurrently.
+- [receive.go](receive.go) receive from many connections concurrently using 
messaging package.
+- [send.go](send.go) send to many connections concurrently using messaging 
package.
+- [event_broker.go](event_broker.go) simple mini-broker using event package.
 
-## event examples
+## Using the Go packages
 
-- [broker.go](event/broker.go) simple mini-broker, queues are created 
automatically.
+Set your GOPATH environment variable to include 
`/<path-to-proton>/proton-c/bindings/go`
 
-## Installing the proton Go packages
+The proton Go packages include C code so the cgo compiler needs to be able to
+find the proton library and include files.  There are a couple of ways to do 
this:
 
-You need to install proton in a standard place such as `/usr` or `/usr/local` 
so go
-can find the proton C headers and libraries to build the C parts of the 
packages.
+1. Build proton in directory `$BUILD`. Source the script `$BUILD/config.sh` to 
set your environment.
 
-You should create a go workspace and set GOPATH as described in 
https://golang.org/doc/code.html
+2. Install proton to a standard prefix such as `/usr` or `/usr/local`. No need 
for further settings.
 
-To get the proton packages into your workspace you can clone the proton 
repository like this:
+3. Install proton to a non-standard prefix `$PREFIX`. Set the following:
 
-    git clone https://git.apache.org/qpid-proton.git 
$GOPATH/src/qpid.apache.org/proton
+        export LIBRARY_PATH=$PREFIX/lib:$LIBRARY_PATH
+        export C_INCLUDE_PATH=$PREFIX/include:$C_INCLUDE_PATH
+        export LD_LIBRARY_PATH=$PREFIX/lib:$LD_LIBRARY_PATH
 
-If you prefer to keep your proton clone elsewhere you can create a symlink to 
it in your workspace.
-
-You can also use `go get` as follows:
-
-    go get qpid.apache.org/proton/go/messaging
-
-Once installed you can use godoc to look at docmentation on the commane line 
or start a
-godoc web server like this:
-
-       godoc -http=:6060
-
-And look at the docs in your browser.
-
-Right now the layout of the documentation is a bit messed up, showing the long
-path for imports, i.e.
-
-    qpid.apache.org/proton/proton-c/bindings/go/amqp
-
-In your code you should use:
-
-    qpid.apache.org/proton/go/amqp
+Once you are set up, the go tools will work as normal. You can see 
documentation
+in your web browser at `localhost:6060` by running:
 
+    godoc -http=:6060
 
 ## Running the examples
 
@@ -69,15 +54,15 @@ the example source have more details.
 
 First start the broker:
 
-    go run event/broker.go
+    go run event_broker.go
 
 Send messages concurrently to queues "foo" and "bar", 10 messages to each 
queue:
 
-    go run go/send.go -count 10 localhost:/foo localhost:/bar
+    go run send.go -count 10 localhost:/foo localhost:/bar
 
 Receive messages concurrently from "foo" and "bar". Note -count 20 for 10 
messages each on 2 queues:
 
-    go run go/receive.go -count 20 localhost:/foo localhost:/bar
+    go run receive.go -count 20 localhost:/foo localhost:/bar
 
 The broker and clients use the amqp port on the local host by default, to use a
 different address use the `-addr host:port` flag.
@@ -91,3 +76,20 @@ Or use the Go broker and the python clients:
     python ../python/simple_send.py
     python ../python/simple_recv.py`.
 
+
+# Experimental `go get` support.
+
+BROKEN - DO NOT USE unless you are interested in helping us fix it :)
+
+We have `go get` meta tags set up at qpid.apache.org so you can do this:
+
+    go get qpid.apache.org/proton/go/messaging
+
+This pulls the entire proton repo into your workspace. There is a "go" symlink 
in the
+repo root so imports work correctly: `import 
qpid.apache.org/proton/go/messaging`
+
+However godoc (and I believe some other go tools) doesn't handle symlinks and 
is
+confused by the other files in the repo. For example `godoc -http` will not 
show
+the proper package names (because it ignores the symlink) but instead shows 
them
+with the full `proton/proton-c/bindings/go/src...` path. It also shows bogus
+empty packages corresponding to the examples/go directory.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/event/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go
deleted file mode 100644
index 0cb4bfa..0000000
--- a/examples/go/event/broker.go
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-//
-// This is a simple AMQP broker implemented using the event-handler interface.
-//
-// It maintains a set of named in-memory queues of messages. Clients can send
-// messages to queues or subscribe to receive messages from them.
-//
-//
-
-package main
-
-import (
-       "container/list"
-       "flag"
-       "fmt"
-       "io"
-       "io/ioutil"
-       "log"
-       "net"
-       "os"
-       "path"
-       "qpid.apache.org/proton/go/amqp"
-       "qpid.apache.org/proton/go/event"
-       "sync"
-)
-
-// Command-line flags
-var addr = flag.String("addr", ":amqp", "Listening address")
-var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means 
more")
-var full = flag.Bool("full", false, "Print full message not just body.")
-
-func main() {
-       flag.Usage = func() {
-               fmt.Fprintf(os.Stderr, `
-Usage: %s
-A simple broker-like demo. Queues are created automatically for sender or 
receiver addrsses.
-`, os.Args[0])
-               flag.PrintDefaults()
-       }
-       flag.Parse()
-       b := newBroker()
-       err := b.listen(*addr)
-       fatalIf(err)
-}
-
-// queue is a structure representing a queue.
-type queue struct {
-       name      string              // Name of queue
-       messages  *list.List          // List of event.Message
-       consumers map[event.Link]bool // Set of consumer links
-}
-
-type logLink event.Link // Wrapper to print links in format for logging
-
-func (ll logLink) String() string {
-       l := event.Link(ll)
-       return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
-}
-
-func (q *queue) subscribe(link event.Link) {
-       debug.Printf("link %s subscribed to queue %s", logLink(link), q.name)
-       q.consumers[link] = true
-}
-
-func (q *queue) unsubscribe(link event.Link) {
-       debug.Printf("link %s unsubscribed from queue %s", logLink(link), 
q.name)
-       delete(q.consumers, link)
-}
-
-func (q *queue) empty() bool {
-       return len(q.consumers) == 0 && q.messages.Len() == 0
-}
-
-func (q *queue) push(context *event.Pump, message amqp.Message) {
-       q.messages.PushBack(message)
-       q.pop(context)
-}
-
-func (q *queue) popTo(context *event.Pump, link event.Link) bool {
-       if q.messages.Len() != 0 && link.Credit() > 0 {
-               message := q.messages.Remove(q.messages.Front()).(amqp.Message)
-               debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, 
formatMessage{message})
-               // The first return parameter is an event.Delivery.
-               // The Deliver can be used to track message status, e.g. so we 
can re-delver on failure.
-               // This demo broker doesn't do that.
-               linkPump := link.Session().Connection().Pump()
-               if context == linkPump {
-                       if context == nil {
-                               log.Fatal("pop in nil context")
-                       }
-                       link.Send(message) // link is in the current pump, safe 
to call Send() direct
-               } else {
-                       linkPump.Inject <- func() { // Inject to link's pump
-                               link.Send(message) // FIXME aconway 2015-05-04: 
error handlig
-                       }
-               }
-               return true
-       }
-       return false
-}
-
-func (q *queue) pop(context *event.Pump) (popped bool) {
-       for c, _ := range q.consumers {
-               popped = popped || q.popTo(context, c)
-       }
-       return
-}
-
-// broker implements event.MessagingHandler and reacts to events by moving 
messages on or off queues.
-type broker struct {
-       queues map[string]*queue
-       lock   sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker 
coming...
-}
-
-func newBroker() *broker {
-       return &broker{queues: make(map[string]*queue)}
-}
-
-func (b *broker) getQueue(name string) *queue {
-       q := b.queues[name]
-       if q == nil {
-               debug.Printf("Create queue %s", name)
-               q = &queue{name, list.New(), make(map[event.Link]bool)}
-               b.queues[name] = q
-       }
-       return q
-}
-
-func (b *broker) unsubscribe(l event.Link) {
-       if l.IsSender() {
-               q := b.queues[l.RemoteSource().Address()]
-               if q != nil {
-                       q.unsubscribe(l)
-                       if q.empty() {
-                               debug.Printf("Delete queue %s", q.name)
-                               delete(b.queues, q.name)
-                       }
-               }
-       }
-}
-
-func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e 
event.Event) error {
-       // FIXME aconway 2015-05-04: locking is un-golike, better example 
coming soon.
-       // Needed because the same handler is used for multiple connections 
concurrently
-       // and the queue data structures are not thread safe.
-       b.lock.Lock()
-       defer b.lock.Unlock()
-
-       switch t {
-
-       case event.MLinkOpening:
-               if e.Link().IsSender() {
-                       q := b.getQueue(e.Link().RemoteSource().Address())
-                       q.subscribe(e.Link())
-               }
-
-       case event.MLinkDisconnected, event.MLinkClosing:
-               b.unsubscribe(e.Link())
-
-       case event.MSendable:
-               q := b.getQueue(e.Link().RemoteSource().Address())
-               q.popTo(e.Connection().Pump(), e.Link())
-
-       case event.MMessage:
-               m, err := event.DecodeMessage(e)
-               fatalIf(err)
-               qname := e.Link().RemoteTarget().Address()
-               debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), 
qname, formatMessage{m})
-               b.getQueue(qname).push(e.Connection().Pump(), m)
-       }
-       return nil
-}
-
-func (b *broker) listen(addr string) (err error) {
-       // Use the standard Go "net" package to listen for connections.
-       listener, err := net.Listen("tcp", addr)
-       if err != nil {
-               return err
-       }
-       info.Printf("Listening on %s", listener.Addr())
-       defer listener.Close()
-       for {
-               conn, err := listener.Accept()
-               if err != nil {
-                       info.Printf("Accept error: %s", err)
-                       continue
-               }
-               pump, err := event.NewPump(conn, event.NewMessagingDelegator(b))
-               fatalIf(err)
-               info.Printf("Accepted %s[%p]", pump, pump)
-               pump.Server()
-               go func() {
-                       pump.Run()
-                       if pump.Error == nil {
-                               info.Printf("Closed %s", pump)
-                       } else {
-                               info.Printf("Closed %s: %s", pump, pump.Error)
-                       }
-               }()
-       }
-}
-
-// Logging
-func logger(prefix string, level int, w io.Writer) *log.Logger {
-       if *verbose >= level {
-               return log.New(w, prefix, 0)
-       }
-       return log.New(ioutil.Discard, "", 0)
-}
-
-var info, debug *log.Logger
-
-func init() {
-       flag.Parse()
-       name := path.Base(os.Args[0])
-       log.SetFlags(0)
-       log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log 
errors on stderr.
-       info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log 
info on stdout.
-       debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log 
debug on stderr.
-}
-
-// Simple error handling for demo.
-func fatalIf(err error) {
-       if err != nil {
-               log.Fatal(err)
-       }
-}
-
-type formatMessage struct{ m amqp.Message }
-
-func (fm formatMessage) String() string {
-       if *full {
-               return fmt.Sprintf("%#v", fm.m)
-       } else {
-               return fmt.Sprintf("%#v", fm.m.Body())
-       }
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/event_broker.go
----------------------------------------------------------------------
diff --git a/examples/go/event_broker.go b/examples/go/event_broker.go
new file mode 100644
index 0000000..0cb4bfa
--- /dev/null
+++ b/examples/go/event_broker.go
@@ -0,0 +1,255 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// This is a simple AMQP broker implemented using the event-handler interface.
+//
+// It maintains a set of named in-memory queues of messages. Clients can send
+// messages to queues or subscribe to receive messages from them.
+//
+//
+
+package main
+
+import (
+       "container/list"
+       "flag"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "net"
+       "os"
+       "path"
+       "qpid.apache.org/proton/go/amqp"
+       "qpid.apache.org/proton/go/event"
+       "sync"
+)
+
+// Command-line flags
+var addr = flag.String("addr", ":amqp", "Listening address")
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means 
more")
+var full = flag.Bool("full", false, "Print full message not just body.")
+
+func main() {
+       flag.Usage = func() {
+               fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or 
receiver addrsses.
+`, os.Args[0])
+               flag.PrintDefaults()
+       }
+       flag.Parse()
+       b := newBroker()
+       err := b.listen(*addr)
+       fatalIf(err)
+}
+
+// queue is a structure representing a queue.
+type queue struct {
+       name      string              // Name of queue
+       messages  *list.List          // List of event.Message
+       consumers map[event.Link]bool // Set of consumer links
+}
+
+type logLink event.Link // Wrapper to print links in format for logging
+
+func (ll logLink) String() string {
+       l := event.Link(ll)
+       return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump())
+}
+
+func (q *queue) subscribe(link event.Link) {
+       debug.Printf("link %s subscribed to queue %s", logLink(link), q.name)
+       q.consumers[link] = true
+}
+
+func (q *queue) unsubscribe(link event.Link) {
+       debug.Printf("link %s unsubscribed from queue %s", logLink(link), 
q.name)
+       delete(q.consumers, link)
+}
+
+func (q *queue) empty() bool {
+       return len(q.consumers) == 0 && q.messages.Len() == 0
+}
+
+func (q *queue) push(context *event.Pump, message amqp.Message) {
+       q.messages.PushBack(message)
+       q.pop(context)
+}
+
+func (q *queue) popTo(context *event.Pump, link event.Link) bool {
+       if q.messages.Len() != 0 && link.Credit() > 0 {
+               message := q.messages.Remove(q.messages.Front()).(amqp.Message)
+               debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, 
formatMessage{message})
+               // The first return parameter is an event.Delivery.
+               // The Deliver can be used to track message status, e.g. so we 
can re-delver on failure.
+               // This demo broker doesn't do that.
+               linkPump := link.Session().Connection().Pump()
+               if context == linkPump {
+                       if context == nil {
+                               log.Fatal("pop in nil context")
+                       }
+                       link.Send(message) // link is in the current pump, safe 
to call Send() direct
+               } else {
+                       linkPump.Inject <- func() { // Inject to link's pump
+                               link.Send(message) // FIXME aconway 2015-05-04: 
error handlig
+                       }
+               }
+               return true
+       }
+       return false
+}
+
+func (q *queue) pop(context *event.Pump) (popped bool) {
+       for c, _ := range q.consumers {
+               popped = popped || q.popTo(context, c)
+       }
+       return
+}
+
+// broker implements event.MessagingHandler and reacts to events by moving 
messages on or off queues.
+type broker struct {
+       queues map[string]*queue
+       lock   sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker 
coming...
+}
+
+func newBroker() *broker {
+       return &broker{queues: make(map[string]*queue)}
+}
+
+func (b *broker) getQueue(name string) *queue {
+       q := b.queues[name]
+       if q == nil {
+               debug.Printf("Create queue %s", name)
+               q = &queue{name, list.New(), make(map[event.Link]bool)}
+               b.queues[name] = q
+       }
+       return q
+}
+
+func (b *broker) unsubscribe(l event.Link) {
+       if l.IsSender() {
+               q := b.queues[l.RemoteSource().Address()]
+               if q != nil {
+                       q.unsubscribe(l)
+                       if q.empty() {
+                               debug.Printf("Delete queue %s", q.name)
+                               delete(b.queues, q.name)
+                       }
+               }
+       }
+}
+
+func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e 
event.Event) error {
+       // FIXME aconway 2015-05-04: locking is un-golike, better example 
coming soon.
+       // Needed because the same handler is used for multiple connections 
concurrently
+       // and the queue data structures are not thread safe.
+       b.lock.Lock()
+       defer b.lock.Unlock()
+
+       switch t {
+
+       case event.MLinkOpening:
+               if e.Link().IsSender() {
+                       q := b.getQueue(e.Link().RemoteSource().Address())
+                       q.subscribe(e.Link())
+               }
+
+       case event.MLinkDisconnected, event.MLinkClosing:
+               b.unsubscribe(e.Link())
+
+       case event.MSendable:
+               q := b.getQueue(e.Link().RemoteSource().Address())
+               q.popTo(e.Connection().Pump(), e.Link())
+
+       case event.MMessage:
+               m, err := event.DecodeMessage(e)
+               fatalIf(err)
+               qname := e.Link().RemoteTarget().Address()
+               debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), 
qname, formatMessage{m})
+               b.getQueue(qname).push(e.Connection().Pump(), m)
+       }
+       return nil
+}
+
+func (b *broker) listen(addr string) (err error) {
+       // Use the standard Go "net" package to listen for connections.
+       listener, err := net.Listen("tcp", addr)
+       if err != nil {
+               return err
+       }
+       info.Printf("Listening on %s", listener.Addr())
+       defer listener.Close()
+       for {
+               conn, err := listener.Accept()
+               if err != nil {
+                       info.Printf("Accept error: %s", err)
+                       continue
+               }
+               pump, err := event.NewPump(conn, event.NewMessagingDelegator(b))
+               fatalIf(err)
+               info.Printf("Accepted %s[%p]", pump, pump)
+               pump.Server()
+               go func() {
+                       pump.Run()
+                       if pump.Error == nil {
+                               info.Printf("Closed %s", pump)
+                       } else {
+                               info.Printf("Closed %s: %s", pump, pump.Error)
+                       }
+               }()
+       }
+}
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+       if *verbose >= level {
+               return log.New(w, prefix, 0)
+       }
+       return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+       flag.Parse()
+       name := path.Base(os.Args[0])
+       log.SetFlags(0)
+       log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log 
errors on stderr.
+       info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log 
info on stdout.
+       debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log 
debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+       if err != nil {
+               log.Fatal(err)
+       }
+}
+
+type formatMessage struct{ m amqp.Message }
+
+func (fm formatMessage) String() string {
+       if *full {
+               return fmt.Sprintf("%#v", fm.m)
+       } else {
+               return fmt.Sprintf("%#v", fm.m.Body())
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c9257f47/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index e059c28..a4b4c2c 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -25,6 +25,7 @@ package main
 import (
        "bufio"
        "bytes"
+       "flag"
        "fmt"
        "io"
        "io/ioutil"
@@ -77,11 +78,11 @@ func (b *broker) check() error {
 
 // Start the demo broker, wait till it is listening on *addr. No-op if already 
started.
 func (b *broker) start() error {
-       build("event/broker.go")
+       build("event_broker.go")
        if b.cmd == nil { // Not already started
                // FIXME aconway 2015-04-30: better way to pick/configure a 
broker port.
                b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000)
-               b.cmd = exec.Command(exepath("broker"), "-addr", b.addr, 
"-verbose", "0")
+               b.cmd = exec.Command(exepath("event_broker"), "-addr", b.addr, 
"-verbose", "0")
                b.runerr = make(chan error)
                // Change the -verbose setting above to see broker output on 
stdout/stderr.
                b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout
@@ -246,7 +247,12 @@ func init() {
 
 func build(prog string) {
        if !built[prog] {
-               build := exec.Command("go", "build", path.Join(exampleDir, 
prog))
+               args := []string{"build"}
+               if *rpath != "" {
+                       args = append(args, "-ldflags", "-r "+*rpath)
+               }
+               args = append(args, path.Join(exampleDir, prog))
+               build := exec.Command("go", args...)
                build.Dir = binDir
                out, err := build.CombinedOutput()
                if err != nil {
@@ -256,6 +262,8 @@ func build(prog string) {
        }
 }
 
+var rpath = flag.String("rpath", "", "Runtime path for test executables")
+
 func TestMain(m *testing.M) {
        rand.Seed(time.Now().UTC().UnixNano())
        var err error


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to