PROTON-827: Concurrent Go API for proton. Provide a simple concurrent-safe, blocking API for proton. See the qpid.apache.org/proton/concurrent package documentation for details.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bd3fb337 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bd3fb337 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bd3fb337 Branch: refs/heads/proton-go Commit: bd3fb337c475c4df07cf359665cee63ea5a0d2a7 Parents: bc72b8b Author: Alan Conway <[email protected]> Authored: Tue Sep 8 13:33:57 2015 -0400 Committer: Alan Conway <[email protected]> Committed: Mon Sep 28 14:08:23 2015 -0400 ---------------------------------------------------------------------- .gitignore | 4 + examples/go/CMakeLists.txt | 20 +- examples/go/README.md | 15 +- examples/go/broker.go | 161 + examples/go/event_broker.go | 240 -- examples/go/example_test.go | 149 +- examples/go/receive.go | 93 +- examples/go/send.go | 105 +- examples/go/util/queue.go | 106 + examples/go/util/util.go | 69 + proton-c/bindings/go/CMakeLists.txt | 45 +- proton-c/bindings/go/README.md | 97 +- proton-c/bindings/go/genwrap.go | 28 +- .../qpid.apache.org/proton/go/amqp.a | 1726 --------- .../qpid.apache.org/proton/go/event.a | 3318 ----------------- .../qpid.apache.org/proton/go/internal.a | Bin 102074 -> 0 bytes .../qpid.apache.org/proton/go/messaging.a | 1100 ------ .../qpid.apache.org/proton/go/reactor.a | 3319 ------------------ .../go/src/qpid.apache.org/proton/amqp/doc.go | 34 + .../go/src/qpid.apache.org/proton/amqp/error.go | 66 + .../go/src/qpid.apache.org/proton/amqp/interop | 1 + .../qpid.apache.org/proton/amqp/interop_test.go | 381 ++ .../src/qpid.apache.org/proton/amqp/marshal.go | 250 ++ .../src/qpid.apache.org/proton/amqp/message.go | 347 ++ .../qpid.apache.org/proton/amqp/message_test.go | 166 + .../go/src/qpid.apache.org/proton/amqp/types.go | 198 ++ .../qpid.apache.org/proton/amqp/unmarshal.go | 556 +++ .../go/src/qpid.apache.org/proton/amqp/url.go | 96 + .../src/qpid.apache.org/proton/amqp/url_test.go | 51 + .../proton/concurrent/connection.go | 167 + .../proton/concurrent/container.go | 71 + .../qpid.apache.org/proton/concurrent/doc.go | 38 + .../proton/concurrent/endpoint.go | 86 + .../proton/concurrent/handler.go | 137 + .../qpid.apache.org/proton/concurrent/link.go | 232 ++ .../proton/concurrent/messaging_test.go | 205 ++ .../proton/concurrent/receiver.go | 242 ++ .../qpid.apache.org/proton/concurrent/sender.go | 190 + .../proton/concurrent/session.go | 115 + .../qpid.apache.org/proton/concurrent/time.go | 71 + .../go/src/qpid.apache.org/proton/doc.go | 46 + .../go/src/qpid.apache.org/proton/engine.go | 402 +++ .../src/qpid.apache.org/proton/go/amqp/doc.go | 40 - .../src/qpid.apache.org/proton/go/amqp/interop | 1 - .../proton/go/amqp/interop_test.go | 308 -- .../qpid.apache.org/proton/go/amqp/marshal.go | 238 -- .../qpid.apache.org/proton/go/amqp/message.go | 342 -- .../proton/go/amqp/message_test.go | 90 - .../src/qpid.apache.org/proton/go/amqp/types.go | 193 - .../src/qpid.apache.org/proton/go/amqp/uid.go | 40 - .../qpid.apache.org/proton/go/amqp/unmarshal.go | 552 --- .../src/qpid.apache.org/proton/go/amqp/url.go | 96 - .../qpid.apache.org/proton/go/amqp/url_test.go | 51 - .../src/qpid.apache.org/proton/go/event/doc.go | 38 - .../qpid.apache.org/proton/go/event/handlers.go | 411 --- .../qpid.apache.org/proton/go/event/message.go | 75 - .../src/qpid.apache.org/proton/go/event/pump.go | 360 -- .../qpid.apache.org/proton/go/event/wrappers.go | 253 -- .../proton/go/event/wrappers_gen.go | 732 ---- .../qpid.apache.org/proton/go/internal/error.go | 126 - .../qpid.apache.org/proton/go/messaging/doc.go | 28 - .../proton/go/messaging/handler.go | 70 - .../proton/go/messaging/messaging.go | 261 -- .../go/src/qpid.apache.org/proton/handlers.go | 399 +++ .../qpid.apache.org/proton/internal/error.go | 121 + .../proton/internal/flexchannel.go | 82 + .../proton/internal/flexchannel_test.go | 89 + .../qpid.apache.org/proton/internal/safemap.go | 57 + .../src/qpid.apache.org/proton/internal/uuid.go | 70 + .../go/src/qpid.apache.org/proton/message.go | 79 + .../go/src/qpid.apache.org/proton/wrappers.go | 335 ++ .../src/qpid.apache.org/proton/wrappers_gen.go | 874 +++++ proton-c/include/proton/link.h | 2 +- 73 files changed, 6884 insertions(+), 14272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 4075b3f..ad4eebb 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,7 @@ proton-c/bindings/go/bin # Testresults from the jenkins build script testresults + +# Go binding build output +/proton-c/bindings/go/pkg +/proton-c/bindings/go/bin http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt index 464ed7c..873180d 100644 --- a/examples/go/CMakeLists.txt +++ b/examples/go/CMakeLists.txt @@ -17,13 +17,21 @@ # under the License. # -# FIXME aconway 2015-05-20: -# - use proton build for Go includes & libs. -# - pre-build go libraries? Respect user GOPATH? - if(BUILD_GO) + set(examples receive send broker) + + foreach(example ${examples}) + add_custom_target(go-example-${example} ALL + COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o ${CMAKE_CURRENT_BINARY_DIR}/${example} ${CMAKE_CURRENT_SOURCE_DIR}/${example}.go + DEPENDS go-packages qpid-proton) + endforeach() + add_custom_target(go-example-test ALL + COMMAND ${GO_TEST} -c ${GO_EXAMPLE_FLAGS} -o ${CMAKE_CURRENT_BINARY_DIR}/example_test ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go + DEPENDS go-packages qpid-proton) + add_test( NAME go_example_test - COMMAND ${GO_TEST} example_test.go -rpath ${CMAKE_BINARY_DIR}/proton-c - WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) + COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker broker) + + list(APPEND ADDITIONAL_MAKE_CLEAN_FILES ${examples}) endif() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/README.md ---------------------------------------------------------------------- diff --git a/examples/go/README.md b/examples/go/README.md index 52bd8e4..33c3d88 100644 --- a/examples/go/README.md +++ b/examples/go/README.md @@ -2,18 +2,17 @@ There are 3 go packages for proton: -- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and from Go data types. -- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for messaging clients and servers. -- qpid.apache.org/proton/go/event: full low-level access to the proton engine. +- qpid.apache.org/proton/amqp: converts AMQP messages and data types to and from Go data types. +- qpid.apache.org/proton/concurrent: easy-to-use, concurrent API for concurrent clients and servers. +- qpid.apache.org/proton: Low-level access to the proton engine. -Most applications should use the `messaging` package. The `event` package is for +Most applications should use the `proton/concurrent` package. The `proton` package is for applications that need low-level access to the proton engine. ## Example programs -- [receive.go](receive.go) receive from many connections concurrently using messaging package. -- [send.go](send.go) send to many connections concurrently using messaging package. -- [event_broker.go](event_broker.go) simple mini-broker using event package. +- [receive.go](receive.go) receive from many connections concurrently. +- [send.go](send.go) send to many connections concurrently. ## Using the Go packages @@ -54,7 +53,7 @@ the example source have more details. First start the broker: - go run event_broker.go + go run reactor_broker.go Send messages concurrently to queues "foo" and "bar", 10 messages to each queue: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/broker.go b/examples/go/broker.go new file mode 100644 index 0000000..3f85e9e --- /dev/null +++ b/examples/go/broker.go @@ -0,0 +1,161 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// +// This is a simple AMQP broker implemented using the concurrent interface. +// +// It maintains a set of named in-memory queues of messages. Clients can send +// messages to queues or subscribe to receive messages from them. +// +// + +package main + +import ( + "./util" + "flag" + "fmt" + "log" + "net" + "os" + "qpid.apache.org/proton/concurrent" +) + +// Usage and command-line flags +func usage() { + fmt.Fprintf(os.Stderr, ` +Usage: %s +A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. +`, os.Args[0]) + flag.PrintDefaults() +} + +var addr = flag.String("addr", ":amqp", "Listening address") + +func main() { + flag.Usage = usage + flag.Parse() + b := newBroker() + err := b.listen(*addr) + util.ExitIf(err) +} + +type broker struct { + container concurrent.Container + queues util.QueueMap +} + +func newBroker() *broker { + return &broker{ + container: concurrent.NewContainer(""), + queues: util.MakeQueueMap(), + } +} + +// Listen for incoming connections +func (b *broker) listen(addr string) (err error) { + listener, err := net.Listen("tcp", addr) + if err != nil { + return err + } + log.Printf("Listening on %s\n", listener.Addr()) + defer listener.Close() + for { + conn, err := listener.Accept() + if err != nil { + return err + } + c, err := b.container.NewConnection(conn) + if err != nil { + return err + } + // Make this a server connection. Must be done before Open() + c.Server() // Server-side protocol negotiation. + c.Listen() // Enable remotely-opened endpoints. + if err := c.Open(); err != nil { + return err + } + util.Debugf("accept %s\n", c) + // Accept remotely-opened endpoints on the connection + go b.accept(c) + } +} + +// accept remotely-opened endpoints (Session, Sender and Receiver) +func (b *broker) accept(c concurrent.Connection) { + for ep, err := c.Accept(); err == nil; ep, err = c.Accept() { + switch ep := ep.(type) { + case concurrent.Session: + util.Debugf("accept session %s\n", ep) + ep.Open() + case concurrent.Sender: + util.Debugf("accept sender %s\n", ep) + ep.Open() + go b.sender(ep) + case concurrent.Receiver: + util.Debugf("accept receiver %s\n", ep) + ep.SetCapacity(100, true) // Pre-fetch 100 messages + ep.Open() + go b.receiver(ep) + } + } +} + +// sender pops from a the queue in the sender's Source address and send messages. +func (b *broker) sender(sender concurrent.Sender) { + qname := sender.Settings().Source + if qname == "" { + log.Printf("invalid consumer, no source address: %s", sender) + return + } + q := b.queues.Get(qname) + for { + m := <-q.Pop + if m == nil { + break + } + if sm, err := sender.Send(m); err == nil { + sm.Forget() // FIXME aconway 2015-09-24: Ignore acknowledgements + util.Debugf("send %s: %s\n", sender, util.FormatMessage(m)) + } else { + util.Debugf("send error %s: %s\n", sender, err) + q.Putback <- m + break + } + } +} + +func (b *broker) receiver(receiver concurrent.Receiver) { + qname := receiver.Settings().Target + if qname == "" { + log.Printf("invalid producer, no target address: %s", receiver) + return + } + q := b.queues.Get(qname) + for { + if rm, err := receiver.Receive(); err == nil { + util.Debugf("recv %s: %s\n", receiver, util.FormatMessage(rm.Message)) + q.Push <- rm.Message + rm.Accept() + } else { + util.Debugf("recv error %s: %s\n", receiver, err) + break + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/event_broker.go ---------------------------------------------------------------------- diff --git a/examples/go/event_broker.go b/examples/go/event_broker.go deleted file mode 100644 index 2578bd5..0000000 --- a/examples/go/event_broker.go +++ /dev/null @@ -1,240 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// -// This is a simple AMQP broker implemented using the event-handler interface. -// -// It maintains a set of named in-memory queues of messages. Clients can send -// messages to queues or subscribe to receive messages from them. -// -// - -package main - -import ( - "container/list" - "flag" - "fmt" - "net" - "os" - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/event" - "sync" -) - -// Usage and command-line flags -func usage() { - fmt.Fprintf(os.Stderr, ` -Usage: %s -A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. -`, os.Args[0]) - flag.PrintDefaults() -} - -var debug = flag.Bool("debug", false, "Print detailed debug output") -var addr = flag.String("addr", ":amqp", "Listening address") -var full = flag.Bool("full", false, "Print full message not just body.") - -func main() { - flag.Usage = usage - flag.Parse() - b := newBroker() - err := b.listen(*addr) - exitIf(err) -} - -// broker implements event.MessagingHandler and reacts to events by moving messages on or off queues. -type broker struct { - queues map[string]*queue - lock sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming... -} - -func newBroker() *broker { - return &broker{queues: make(map[string]*queue)} -} - -func (b *broker) getQueue(name string) *queue { - q := b.queues[name] - if q == nil { - debugf("Create queue %s\n", name) - q = &queue{name, list.New(), make(map[event.Link]bool)} - b.queues[name] = q - } - return q -} - -func (b *broker) unsubscribe(l event.Link) { - if l.IsSender() { - q := b.queues[l.RemoteSource().Address()] - if q != nil { - q.unsubscribe(l) - if q.empty() { - debugf("Delete queue %s\n", q.name) - delete(b.queues, q.name) - } - } - } -} - -func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error { - // FIXME aconway 2015-05-04: locking is un-golike, better example coming soon. - // Needed because the same handler is used for multiple connections concurrently - // and the queue data structures are not thread safe. - b.lock.Lock() - defer b.lock.Unlock() - - switch t { - - case event.MLinkOpening: - if e.Link().IsSender() { - q := b.getQueue(e.Link().RemoteSource().Address()) - q.subscribe(e.Link()) - } - - case event.MLinkDisconnected, event.MLinkClosing: - b.unsubscribe(e.Link()) - - case event.MSendable: - q := b.getQueue(e.Link().RemoteSource().Address()) - q.popTo(e.Connection().Pump(), e.Link()) - - case event.MMessage: - m, err := event.DecodeMessage(e) - exitIf(err) - qname := e.Link().RemoteTarget().Address() - debugf("link %s -> queue %s: %s\n", logLink(e.Link()), qname, formatMessage(m)) - b.getQueue(qname).push(e.Connection().Pump(), m) - } - return nil -} - -func (b *broker) listen(addr string) (err error) { - // Use the standard Go "net" package to listen for connections. - listener, err := net.Listen("tcp", addr) - if err != nil { - return err - } - fmt.Printf("Listening on %s\n", listener.Addr()) - defer listener.Close() - for { - conn, err := listener.Accept() - if err != nil { - fmt.Fprintf(os.Stderr, "Accept error: %s\n", err) - continue - } - pump, err := event.NewPump(conn, event.NewMessagingDelegator(b)) - exitIf(err) - debugf("Accepted %s[%p]\n", pump, pump) - pump.Server() - go func() { - pump.Run() - if pump.Error == nil { - debugf("Closed %s\n", pump) - } else { - debugf("Closed %s: %s\n", pump, pump.Error) - } - }() - } -} - -// queue is a structure representing a queue. -type queue struct { - name string // Name of queue - messages *list.List // List of event.Message - consumers map[event.Link]bool // Set of consumer links -} - -type logLink event.Link // Wrapper to print links in useful format for logging - -func (ll logLink) String() string { - l := event.Link(ll) - return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump()) -} - -func (q *queue) subscribe(link event.Link) { - debugf("link %s subscribed to queue %s\n", logLink(link), q.name) - q.consumers[link] = true -} - -func (q *queue) unsubscribe(link event.Link) { - debugf("link %s unsubscribed from queue %s\n", logLink(link), q.name) - delete(q.consumers, link) -} - -func (q *queue) empty() bool { - return len(q.consumers) == 0 && q.messages.Len() == 0 -} - -func (q *queue) push(context *event.Pump, message amqp.Message) { - q.messages.PushBack(message) - q.pop(context) -} - -func (q *queue) popTo(context *event.Pump, link event.Link) bool { - if q.messages.Len() != 0 && link.Credit() > 0 { - message := q.messages.Remove(q.messages.Front()).(amqp.Message) - debugf("link %s <- queue %s: %s\n", logLink(link), q.name, formatMessage(message)) - // The first return parameter is an event.Delivery. - // The Deliver can be used to track message status, e.g. so we can re-delver on failure. - // This demo broker doesn't do that. - linkPump := link.Session().Connection().Pump() - if context == linkPump { - if context == nil { - exitIf(fmt.Errorf("pop in nil context")) - } - link.Send(message) // link is in the current pump, safe to call Send() direct - } else { - linkPump.Inject <- func() { // Inject to link's pump - link.Send(message) // FIXME aconway 2015-05-04: error handlig - } - } - return true - } - return false -} - -func (q *queue) pop(context *event.Pump) (popped bool) { - for c, _ := range q.consumers { - popped = popped || q.popTo(context, c) - } - return -} - -// Simple debug logging -func debugf(format string, data ...interface{}) { - if *debug { - fmt.Fprintf(os.Stderr, format, data...) - } -} - -// Simple error handling for demo. -func exitIf(err error) { - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } -} - -func formatMessage(m amqp.Message) string { - if *full { - return fmt.Sprintf("%#v", m) - } else { - return fmt.Sprintf("%#v", m.Body()) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/example_test.go ---------------------------------------------------------------------- diff --git a/examples/go/example_test.go b/examples/go/example_test.go index 8879c38..2afd95c 100644 --- a/examples/go/example_test.go +++ b/examples/go/example_test.go @@ -28,21 +28,20 @@ import ( "flag" "fmt" "io" - "io/ioutil" "math/rand" "net" "os" "os/exec" - "path" "path/filepath" "reflect" + "strings" "testing" "time" ) -func panicIf(err error) { +func fatalIf(t *testing.T, err error) { if err != nil { - panic(err) + t.Fatalf("%s", err) } } @@ -77,16 +76,18 @@ func (b *broker) check() error { } // Start the demo broker, wait till it is listening on *addr. No-op if already started. -func (b *broker) start() error { +func (b *broker) start(t *testing.T) error { if b.cmd == nil { // Not already started - // FIXME aconway 2015-04-30: better way to pick/configure a broker port. b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000) - b.cmd = exampleCommand("event_broker", "-addr", b.addr) + b.cmd = exampleCommand(t, *brokerName, "-addr", b.addr) b.runerr = make(chan error) b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout - go func() { - b.runerr <- b.cmd.Run() - }() + b.err = b.cmd.Start() + if b.err == nil { + go func() { b.runerr <- b.cmd.Wait() }() + } else { + b.runerr <- b.err + } b.err = b.check() } return b.err @@ -95,7 +96,7 @@ func (b *broker) start() error { func (b *broker) stop() { if b != nil && b.cmd != nil { b.cmd.Process.Kill() - b.cmd.Wait() + <-b.runerr } } @@ -106,22 +107,49 @@ func checkEqual(want interface{}, got interface{}) error { return fmt.Errorf("%#v != %#v", want, got) } -// runCommand returns an exec.Cmd to run an example. -func exampleCommand(prog string, arg ...string) *exec.Cmd { - build(prog + ".go") +// 'go build' uses the installed copy of the proton Go libraries, which may be out of date. +func checkStaleLibs(t *testing.T) { + var stale []string + pp := "qpid.apache.org/proton" + for _, p := range []string{pp, pp + "/amqp", pp + "/concurrent"} { + out, err := exec.Command("go", "list", "-f", "{{.Stale}}", p).CombinedOutput() + if err != nil { + t.Fatalf("failed to execute 'go list': %v\n%v", err, string(out)) + } + if string(out) != "false\n" { + stale = append(stale, pp) + } + } + if len(stale) > 0 { + t.Fatalf("Stale libraries, run 'go install %s'", strings.Trim(fmt.Sprint(stale), "[]")) + } +} + +// exampleCommand returns an exec.Cmd to run an example. +func exampleCommand(t *testing.T, prog string, arg ...string) (cmd *exec.Cmd) { + checkStaleLibs(t) args := []string{} if *debug { args = append(args, "-debug=true") } args = append(args, arg...) - cmd := exec.Command(exepath(prog), args...) + prog, err := filepath.Abs(prog) + fatalIf(t, err) + if _, err := os.Stat(prog); err == nil { + cmd = exec.Command(prog, args...) + } else if _, err := os.Stat(prog + ".go"); err == nil { + args = append([]string{"run", prog + ".go"}, args...) + cmd = exec.Command("go", args...) + } else { + t.Fatalf("Cannot find binary or source for %s", prog) + } cmd.Stderr = os.Stderr return cmd } // Run an example Go program, return the combined output as a string. -func runExample(prog string, arg ...string) (string, error) { - cmd := exampleCommand(prog, arg...) +func runExample(t *testing.T, prog string, arg ...string) (string, error) { + cmd := exampleCommand(t, prog, arg...) out, err := cmd.Output() return string(out), err } @@ -133,8 +161,8 @@ func prefix(prefix string, err error) error { return nil } -func runExampleWant(want string, prog string, args ...string) error { - out, err := runExample(prog, args...) +func runExampleWant(t *testing.T, want string, prog string, args ...string) error { + out, err := runExample(t, prog, args...) if err != nil { return fmt.Errorf("%s failed: %s: %s", prog, err, out) } @@ -142,7 +170,10 @@ func runExampleWant(want string, prog string, args ...string) error { } func exampleArgs(args ...string) []string { - return append(args, testBroker.addr+"/foo", testBroker.addr+"/bar", testBroker.addr+"/baz") + for i := 0; i < *connections; i++ { + args = append(args, fmt.Sprintf("%s/%s%d", testBroker.addr, "q", i)) + } + return args } // Send then receive @@ -150,18 +181,18 @@ func TestExampleSendReceive(t *testing.T) { if testing.Short() { t.Skip("Skip demo tests in short mode") } - testBroker.start() - err := runExampleWant( - "Received all 15 acknowledgements\n", + testBroker.start(t) + err := runExampleWant(t, + fmt.Sprintf("Received all %d acknowledgements\n", expected), "send", - exampleArgs("-count", "5")...) + exampleArgs("-count", fmt.Sprintf("%d", *count))...) if err != nil { t.Fatal(err) } - err = runExampleWant( - "Listening on 3 connections\nReceived 15 messages\n", + err = runExampleWant(t, + fmt.Sprintf("Listening on %v connections\nReceived %v messages\n", *connections, *count**connections), "receive", - exampleArgs("-count", "15")...) + exampleArgs("-count", fmt.Sprintf("%d", *count**connections))...) if err != nil { t.Fatal(err) } @@ -175,8 +206,8 @@ func init() { ready = fmt.Errorf("Ready") } // Send ready on errchan when it is listening. // Send final error when it is done. // Returns the Cmd, caller must Wait() -func goReceiveWant(errchan chan<- error, want string, arg ...string) *exec.Cmd { - cmd := exampleCommand("receive", arg...) +func goReceiveWant(t *testing.T, errchan chan<- error, want string, arg ...string) *exec.Cmd { + cmd := exampleCommand(t, "receive", arg...) go func() { pipe, err := cmd.StdoutPipe() if err != nil { @@ -209,11 +240,12 @@ func TestExampleReceiveSend(t *testing.T) { if testing.Short() { t.Skip("Skip demo tests in short mode") } - testBroker.start() + testBroker.start(t) recvErr := make(chan error) - recvCmd := goReceiveWant(recvErr, - "Received 15 messages\n", - exampleArgs("-count", "15")...) + recvCmd := goReceiveWant( + t, recvErr, + fmt.Sprintf("Received %d messages\n", expected), + exampleArgs(fmt.Sprintf("-count=%d", expected))...) defer func() { recvCmd.Process.Kill() recvCmd.Wait() @@ -221,10 +253,10 @@ func TestExampleReceiveSend(t *testing.T) { if err := <-recvErr; err != ready { // Wait for receiver ready t.Fatal(err) } - err := runExampleWant( - "Received all 15 acknowledgements\n", + err := runExampleWant(t, + fmt.Sprintf("Received all %d acknowledgements\n", expected), "send", - exampleArgs("-count", "5")...) + exampleArgs("-count", fmt.Sprintf("%d", *count))...) if err != nil { t.Fatal(err) } @@ -233,51 +265,18 @@ func TestExampleReceiveSend(t *testing.T) { } } -func exepath(relative string) string { - if binDir == "" { - panic("bindir not set, cannot run example binaries") - } - return path.Join(binDir, relative) -} - var testBroker *broker -var binDir, exampleDir string -var built map[string]bool - -func init() { - built = make(map[string]bool) -} -func build(prog string) { - if !built[prog] { - args := []string{"build"} - if *rpath != "" { - args = append(args, "-ldflags", "-r "+*rpath) - } - args = append(args, path.Join(exampleDir, prog)) - build := exec.Command("go", args...) - build.Dir = binDir - out, err := build.CombinedOutput() - if err != nil { - panic(fmt.Errorf("%v: %s", err, out)) - } - built[prog] = true - } -} - -var rpath = flag.String("rpath", "", "Runtime path for test executables") var debug = flag.Bool("debug", false, "Debugging output from examples") +var brokerName = flag.String("broker", "broker", "Name of broker executable to run") +var count = flag.Int("count", 3, "Count of messages to send in tests") +var connections = flag.Int("connections", 3, "Number of connections to make in tests") +var expected int func TestMain(m *testing.M) { + expected = (*count) * (*connections) rand.Seed(time.Now().UTC().UnixNano()) - var err error - exampleDir, err = filepath.Abs(".") - panicIf(err) - binDir, err = ioutil.TempDir("", "example_test.go") - panicIf(err) - defer os.Remove(binDir) // Clean up binaries - testBroker = &broker{} // Broker is started on-demand by tests. - testBroker.stop() + testBroker = &broker{} // Broker is started on-demand by tests. status := m.Run() testBroker.stop() os.Exit(status) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/receive.go b/examples/go/receive.go index b1eb309..a45ffe3 100644 --- a/examples/go/receive.go +++ b/examples/go/receive.go @@ -20,12 +20,14 @@ under the License. package main import ( + "./util" "flag" "fmt" + "log" "net" "os" - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/messaging" + "qpid.apache.org/proton/amqp" + "qpid.apache.org/proton/concurrent" "sync" ) @@ -37,9 +39,7 @@ Receive messages from all the listed URLs concurrently and print them. flag.PrintDefaults() } -var debug = flag.Bool("debug", false, "Print detailed debug output") var count = flag.Uint64("count", 1, "Stop after receiving this many messages.") -var full = flag.Bool("full", false, "Print full message not just body.") func main() { flag.Usage = usage @@ -47,7 +47,7 @@ func main() { urls := flag.Args() // Non-flag arguments are URLs to receive from if len(urls) == 0 { - fmt.Fprintln(os.Stderr, "No URL provided") + log.Println("No URL provided") usage() os.Exit(1) } @@ -57,39 +57,46 @@ func main() { var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. wait.Add(len(urls)) // Wait for one goroutine per URL. - connections := make([]*messaging.Connection, len(urls)) // Store connctions to close on exit + container := concurrent.NewContainer("") + connections := make(chan concurrent.Connection, len(urls)) // Connections to close on exit // Start a goroutine to for each URL to receive messages and send them to the messages channel. // main() receives and prints them. - for i, urlStr := range urls { - debugf("debug: Connecting to %s\n", urlStr) + for _, urlStr := range urls { + util.Debugf("Connecting to %s\n", urlStr) go func(urlStr string) { // Start the goroutine defer wait.Done() // Notify main() when this goroutine is done. url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - exitIf(err) + util.ExitIf(err) - // Open a standard Go net.Conn and and AMQP connection using it. + // Open a new connection conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - exitIf(err) - pc, err := messaging.Connect(conn) // This is our AMQP connection. - exitIf(err) - connections[i] = pc // Save connection so it will be closed when main() ends - - // Create a receiver using the path of the URL as the AMQP address - r, err := pc.Receiver(url.Path) - exitIf(err) - - // Loop receiving messages + util.ExitIf(err) + c, err := container.NewConnection(conn) + util.ExitIf(err) + util.ExitIf(c.Open()) + connections <- c // Save connection so we can Close() when main() ends + + // Create and open a session + ss, err := c.NewSession() + util.ExitIf(err) + err = ss.Open() + util.ExitIf(err) + + // Create a Receiver using the path of the URL as the source address + r, err := ss.Receiver(url.Path) + util.ExitIf(err) + + // Loop receiving messages and sending them to the main() goroutine for { - var m amqp.Message - select { // Receive a message or stop. - case m = <-r.Receive: - case <-stop: // The program is stopping. + rm, err := r.Receive() + if err == concurrent.Closed { return } + util.ExitIf(err) select { // Send m to main() or stop - case messages <- m: // Send m to main() + case messages <- rm.Message: // Send to main() case <-stop: // The program is stopping. return } @@ -102,37 +109,17 @@ func main() { // print each message until the count is exceeded. for i := uint64(0); i < *count; i++ { - debugf("%s\n", formatMessage(<-messages)) + m := <-messages + util.Debugf("%s\n", util.FormatMessage(m)) } fmt.Printf("Received %d messages\n", *count) + + // Close all connections, this will interrupt goroutines blocked in Receiver.Receive() + for i := 0; i < len(urls); i++ { + c := <-connections + c.Disconnect(nil) // FIXME aconway 2015-09-25: Close + } close(stop) // Signal all goroutines to stop. wait.Wait() // Wait for all goroutines to finish. close(messages) - for _, c := range connections { // Close all connections - if c != nil { - c.Close() - } - } -} - -// Simple debug logging -func debugf(format string, data ...interface{}) { - if *debug { - fmt.Fprintf(os.Stderr, format, data...) - } -} - -// Simple error handling for demo. -func exitIf(err error) { - if err != nil { - fmt.Fprintln(os.Stderr, err) - } -} - -func formatMessage(m amqp.Message) string { - if *full { - return fmt.Sprintf("%#v", m) - } else { - return fmt.Sprintf("%#v", m.Body()) - } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/send.go ---------------------------------------------------------------------- diff --git a/examples/go/send.go b/examples/go/send.go index 98acefa..7fa5416 100644 --- a/examples/go/send.go +++ b/examples/go/send.go @@ -20,12 +20,14 @@ under the License. package main import ( + "./util" "flag" "fmt" + "log" "net" "os" - "qpid.apache.org/proton/go/amqp" - "qpid.apache.org/proton/go/messaging" + "qpid.apache.org/proton/amqp" + "qpid.apache.org/proton/concurrent" "sync" ) @@ -37,13 +39,11 @@ Send messages to each URL concurrently with body "<url-path>-<n>" where n is the flag.PrintDefaults() } -var debug = flag.Bool("debug", false, "Print detailed debug output") var count = flag.Int64("count", 1, "Send this may messages per address.") -// Ack associates an info string with an acknowledgement -type Ack struct { - ack messaging.Acknowledgement - info string +type sent struct { + name string + sentMessage concurrent.SentMessage } func main() { @@ -52,63 +52,68 @@ func main() { urls := flag.Args() // Non-flag arguments are URLs to receive from if len(urls) == 0 { - fmt.Fprintln(os.Stderr, "No URL provided") + log.Println("No URL provided") flag.Usage() os.Exit(1) } - acks := make(chan Ack) // Channel to receive all the acknowledgements - var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. - wait.Add(len(urls)) // Wait for one goroutine per URL. + sentChan := make(chan sent) // Channel to receive all the delivery receipts. + var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. + wait.Add(len(urls)) // Wait for one goroutine per URL. - connections := make([]*messaging.Connection, len(urls)) // Store connctions to close on exit + container := concurrent.NewContainer("") + var connections []concurrent.Connection // Store connctions to close on exit - // Start a goroutine for each URL to send messages, receive the acknowledgements and - // send them to the acks channel. - for i, urlStr := range urls { - debugf("Connecting to %v\n", urlStr) + // Start a goroutine for each URL to send messages. + for _, urlStr := range urls { + util.Debugf("Connecting to %v\n", urlStr) go func(urlStr string) { defer wait.Done() // Notify main() that this goroutine is done. url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - exitIf(err) + util.ExitIf(err) - // Open a standard Go net.Conn and and AMQP connection using it. + // Open a new connection conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - exitIf(err) - pc, err := messaging.Connect(conn) // This is our AMQP connection. - exitIf(err) - connections[i] = pc // Save connection so it will be closed when main() ends - - // Create a sender using the path of the URL as the AMQP address - s, err := pc.Sender(url.Path) - exitIf(err) - - // Loop sending messages, receiving acknowledgements and sending them to the acks channel. + util.ExitIf(err) + c, err := container.NewConnection(conn) + util.ExitIf(err) + err = c.Open() + util.ExitIf(err) + connections = append(connections, c) // Save connection so it will be closed when main() ends + + // Create and open a session + ss, err := c.NewSession() + util.ExitIf(err) + err = ss.Open() + util.ExitIf(err) + + // Create a Sender using the path of the URL as the AMQP address + s, err := ss.Sender(url.Path) + util.ExitIf(err) + + // Loop sending messages. for i := int64(0); i < *count; i++ { m := amqp.NewMessage() body := fmt.Sprintf("%v-%v", url.Path, i) - m.SetBody(body) - // Note Send is *asynchronous*, ack is a channel that will receive the acknowledgement. - ack, err := s.Send(m) - exitIf(err) - acks <- Ack{ack, body} // Send the acknowledgement to main() + m.Marshal(body) + sentMessage, err := s.Send(m) + util.ExitIf(err) + sentChan <- sent{body, sentMessage} } }(urlStr) } // Wait for all the acknowledgements expect := int(*count) * len(urls) - debugf("Started senders, expect %v acknowledgements\n", expect) + util.Debugf("Started senders, expect %v acknowledgements\n", expect) for i := 0; i < expect; i++ { - ack, ok := <-acks - if !ok { - exitIf(fmt.Errorf("acks channel closed after only %d acks\n", i)) - } - d := <-ack.ack - debugf("acknowledgement[%v] %v\n", i, ack.info) - if d != messaging.Accepted { - fmt.Printf("Unexpected disposition %v\n", d) + d := <-sentChan + disposition, err := d.sentMessage.Disposition() + if err != nil { + util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err) + } else { + util.Debugf("acknowledgement[%v] %v (%v)\n", i, d.name, disposition) } } fmt.Printf("Received all %v acknowledgements\n", expect) @@ -116,21 +121,7 @@ func main() { wait.Wait() // Wait for all goroutines to finish. for _, c := range connections { // Close all connections if c != nil { - c.Close() + c.Close(nil) } } } - -// Simple debug logging -func debugf(format string, data ...interface{}) { - if *debug { - fmt.Fprintf(os.Stderr, format, data...) - } -} - -// Simple error handling for demo. -func exitIf(err error) { - if err != nil { - fmt.Fprintln(os.Stderr, err) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/util/queue.go ---------------------------------------------------------------------- diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go new file mode 100644 index 0000000..075c4d2 --- /dev/null +++ b/examples/go/util/queue.go @@ -0,0 +1,106 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package util + +import ( + "container/list" + "qpid.apache.org/proton/amqp" + "sync" +) + +// Queue is a concurrent-safe queue of amqp.Message. +type Queue struct { + name string + messages list.List // List of amqp.Message + // Send to Push to push a message onto back of queue + Push chan amqp.Message + // Receive from Pop to pop a message from the front of the queue. + Pop chan amqp.Message + // Send to Putback to put an unsent message back on the front of the queue. + Putback chan amqp.Message +} + +func NewQueue(name string) *Queue { + q := &Queue{ + name: name, + Push: make(chan amqp.Message), + Pop: make(chan amqp.Message), + Putback: make(chan amqp.Message), + } + go q.run() + return q +} + +// Close the queue. Any remaining messages on Pop can still be received. +func (q *Queue) Close() { close(q.Push); close(q.Putback) } + +// Run runs the queue, returns when q.Close() is called. +func (q *Queue) run() { + defer close(q.Pop) + for { + var pop chan amqp.Message + var front amqp.Message + if el := q.messages.Front(); el != nil { + front = el.Value.(amqp.Message) + pop = q.Pop // Only select for pop if there is something to pop. + } + select { + case m, ok := <-q.Push: + if !ok { + return + } + Debugf("%s push: %s\n", q.name, FormatMessage(m)) + q.messages.PushBack(m) + case m, ok := <-q.Putback: + Debugf("%s put-back: %s\n", q.name, FormatMessage(m)) + if !ok { + return + } + q.messages.PushFront(m) + case pop <- front: + Debugf("%s pop: %s\n", q.name, FormatMessage(front)) + q.messages.Remove(q.messages.Front()) + } + } +} + +// QueueMap is a concurrent-safe map of queues that creates new queues +// on demand. +type QueueMap struct { + lock sync.Mutex + m map[string]*Queue +} + +func MakeQueueMap() QueueMap { return QueueMap{m: make(map[string]*Queue)} } + +func (qm *QueueMap) Get(name string) *Queue { + if name == "" { + panic("Attempt to get queue with no name") + } + qm.lock.Lock() + defer qm.lock.Unlock() + q := qm.m[name] + if q == nil { + q = NewQueue(name) + qm.m[name] = q + Debugf("queue %s create", name) + } + return q +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/examples/go/util/util.go ---------------------------------------------------------------------- diff --git a/examples/go/util/util.go b/examples/go/util/util.go new file mode 100644 index 0000000..72c6646 --- /dev/null +++ b/examples/go/util/util.go @@ -0,0 +1,69 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// util contains utility types and functions to simplify parts of the example +// code that are not related to the use of proton. +package util + +import ( + "flag" + "fmt" + "log" + "os" + "path" + "qpid.apache.org/proton/amqp" +) + +// Debug flag "-debug" enables debug output with Debugf +var Debug = flag.Bool("debug", false, "Print detailed debug output") + +// Full flag "-full" enables full message output by FormatMessage +var Full = flag.Bool("full", false, "Print full message not just body.") + +// Debugf logs debug messages if "-debug" flag is set. +func Debugf(format string, data ...interface{}) { + if *Debug { + log.Printf(format, data...) + } +} + +// Simple error handling for demo. +func ExitIf(err error) { + if err != nil { + log.Println(err) + os.Exit(1) + } +} + +// FormatMessage formats a message as a string, just the body by default or +// the full message (with properties etc.) if "-full" flag is set. +func FormatMessage(m amqp.Message) string { + if *Full { + return fmt.Sprintf("%#v", m) + } else { + return fmt.Sprintf("%#v", m.Body()) + } +} + +// For example programs, use the program name as the log prefix. +func init() { + log.SetFlags(0) + _, prog := path.Split(os.Args[0]) + log.SetPrefix(prog + ": ") +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt index ea6238f..0631eae 100644 --- a/proton-c/bindings/go/CMakeLists.txt +++ b/proton-c/bindings/go/CMakeLists.txt @@ -17,32 +17,49 @@ # under the License. # -# FIXME aconway 2015-05-20: install targets for go source. - set(GO_BUILD_FLAGS "" CACHE STRING "Flags for 'go build'") -set(GO_TEST_FLAGS "-v" CACHE STRING "Flags for 'go test'") +set(GO_TEST_FLAGS "-v -race" CACHE STRING "Flags for 'go test'") separate_arguments(GO_BUILD_FLAGS) separate_arguments(GO_TEST_FLAGS) -list(APPEND GO_BUILD_FLAGS "-ldflags=-r ${CMAKE_BINARY_DIR}/proton-c") if (BUILD_GO) - # Build in the source tree, go tools aren't friendly otherwise. - # All build output goes in git-ignored pkg or bin subdirectories. - set(qgo "qpid.apache.org/proton/go") - set(packages ${qgo}/amqp ${qgo}/event ${qgo}/messaging) - # Following are CACHE INTERNAL so examples/CMakeLists.txt can see them. set(GO_ENV ${env_py} -- "GOPATH=${CMAKE_CURRENT_SOURCE_DIR}" "CGO_CFLAGS=-I${CMAKE_SOURCE_DIR}/proton-c/include" "CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/proton-c" - ${GO_EXE} CACHE INTERNAL "Run go with environment set" - ) - set(GO_BUILD ${GO_ENV} build ${GO_BUILD_FLAGS} CACHE INTERNAL "Run go build") - set(GO_TEST ${GO_ENV} test ${GO_BUILD_FLAGS} ${GO_TEST_FLAGS} CACHE INTERNAL "Run go test") + ${GO_EXE} CACHE INTERNAL "Run go with environment set") + + # Set rpath so test and example executables will use the proton library from this build. + execute_process(COMMAND ${GO_EXE} version OUTPUT_VARIABLE go_out) + if (go_out MATCHES "gccgo") + set(GO_RPATH_FLAGS -gccgoflags "-Wl,-rpath=${CMAKE_BINARY_DIR}/proton-c") + else() + set(GO_RPATH_FLAGS -ldflags "-r ${CMAKE_BINARY_DIR}/proton-c") + endif() + + set(GO_BUILD ${GO_ENV} build ${GO_BUILD_FLAGS} ${GO_RPATH_FLAGS} CACHE INTERNAL "Run go build") + set(GO_INSTALL ${GO_ENV} install ${GO_BUILD_FLAGS} CACHE INTERNAL "Run go install") + set(GO_TEST ${GO_ENV} test ${GO_BUILD_FLAGS} ${GO_RPATH_FLAGS} ${GO_TEST_FLAGS} CACHE INTERNAL "Run go test") + + # Install packages in the source tree, go tools aren't friendly otherwise. + # All build output goes in git-ignored pkg or bin subdirectories. + set(qgo "qpid.apache.org/proton") + set(packages ${qgo} ${qgo}/amqp ${qgo}/concurrent ${qgo}/internal) + add_custom_target(go-packages ALL + COMMAND ${GO_INSTALL} ${packages} + WORKING_DIRECTORY ${CMAKE_BINARY_DIR} + DEPENDS qpid-proton) + + add_test( + NAME go_test_packages + COMMAND ${GO_TEST} ${packages} + WORKING_DIRECTORY "${CMAKE_BINARY_DIR}") - add_test(NAME go_package_test COMMAND ${GO_TEST} ${packages}) + list(APPEND ADDITIONAL_MAKE_CLEAN_FILES + ${CMAKE_CURRENT_SOURCE_DIR}/pkg + ${CMAKE_CURRENT_SOURCE_DIR}/bin) set (GO_INSTALL_DIR ${SHARE_INSTALL_DIR}/gocode/src CACHE PATH "Installation directory for Go code") mark_as_advanced (GO_INSTALL_DIR) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/README.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md index 0f16b8e..98a432a 100644 --- a/proton-c/bindings/go/README.md +++ b/proton-c/bindings/go/README.md @@ -8,7 +8,7 @@ Feedback is strongly encouraged: - Email <[email protected]> - Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue. -The package documentation is available at: <http://godoc.org/qpid.apache.org/proton/go> +The package documentation is available at: <http://godoc.org/qpid.apache.org/proton> See the [examples](../../../examples/go/README.md) for working examples and practical instructions on how to get started. @@ -38,40 +38,32 @@ There are two types of developer we want to support There are 3 go packages for proton: -- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and from Go data types. -- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for messaging clients and servers. -- qpid.apache.org/proton/go/event: full low-level access to the proton engine. +- qpid.apache.org/proton/amqp: converts AMQP messages and data types to and from Go data types. +- qpid.apache.org/proton/concurrent: easy-to-use, concurrent API for clients and servers. +- qpid.apache.org/proton: full low-level access to the proton engine. -Most applications should use the `messaging` package. The `event` package is for -applications that need low-level access to the proton engine. +The `amqp` package provides conversions between AMQP and Go data types that are +used by the other two packages. -The `event` package is fairly complete, with the exception of the proton -reactor. It's unclear if the reactor is important for go. +The `concurrent` package provides a simple procedural API that can be used with +goroutines to construct concurrent AMQP clients and servers. -The `messaging` package can run the examples but probably not much else. There -is work to do on error handling and the API may change. +The `proton` package is a concurrency-unsafe, event-driven API. It is a very +thin wrapper providing almost direct access to the underlying proton C API. -There are working [examples](../../../examples/go/README.md) of a broker using `event` and -a sender and receiver using `messaging`. +The `concurrent` package will probably be more familiar and convenient to Go +programmers for most use cases. The `proton` package may be more familiar if +you have used proton in other languages. -## The event driven API +Note the `concurrent` package itself is implemented in terms of the `proton` +package. It takes care of running concurrency-unsafe `proton` code in dedicated +goroutines and setting up channels to move data between user and proton +goroutines safely. It hides all this complexity behind a simple procedural +interface rather than presenting an event-driven interface. -See the package documentation for details. +See the [examples](../../../examples/go/README.md) for a better illustration of the APIs. -## The Go API - -The goal: A procedural API that allows any user goroutine to send and receive -AMQP messages and other information (acknowledgments, flow control instructions -etc.) using channels. There will be no user-visible locks and no need to run -user code in special goroutines, e.g. as handlers in a proton event loop. - -See the package documentation for emerging details. - -Currently using a channel to receive messages, a function to send them (channels -internally) and a channel as a "future" for acknowledgements to senders. This -may change. - -## Why a separate API for Go? +### Why two APIs? Go is a concurrent language and encourages applications to be divided into concurrent *goroutines*. It provides traditional locking but it encourages the @@ -80,8 +72,7 @@ use *channels* to communicate between goroutines without explicit locks: "Share memory by communicating, don't communicate by sharing memory" The idea is that a given value is only operated on by one goroutine at a time, -but values can easily be passed from one goroutine to another. This removes much -of the need for locking. +but values can easily be passed from one goroutine to another. Go literature distinguishes between: @@ -97,8 +88,8 @@ respect to events like file descriptors being readable/writable, channels having data, timers firing etc. Go automatically takes care of switching out goroutines that block or sleep so it is normal to write code in terms of blocking calls. -Event-driven API (like poll, epoll, select or the proton event API) also -channel unpredictably ordered events to actions in one or a small pool of +Event-driven programming (such as poll, epoll, select or the `proton` package) +also channels unpredictably ordered events to actions in one or a small pool of execution threads. However this requires a different style of programming: "event-driven" or "reactive" programming. Go developers call it "inside-out" programming. In an event-driven architecture blocking is a big problem as it @@ -121,15 +112,39 @@ preserved across blocking calls. There's no need to store details in context objects that you have to look up when handling a later event to figure out how to continue where you left off. -So a Go-like proton API does not force the users code to run in an event-loop -goroutine. Instead user goroutines communicate with the event loop(s) via -channels. There's no need to funnel connections into one event loop, in fact it -makes no sense. Connections can be processed concurrently so they should be -processed in separate goroutines and left to Go to schedule. User goroutines can -have simple loops that block channels till messages are available, the user can -start as many or as few such goroutines as they wish to implement concurrency as -simple or as complex as they wish. For example blocking request-response -vs. asynchronous flows of messages and acknowledgments. +The `proton` API is important because it is close to the original proton-C +reactive API and gives you direct access to the underlying library. However it +is un-Go-like in it's event-driven nature, and it requires care as methods on +values associated with the same underlying proton engine are not +concurrent-safe. + +The `concurrent` API hides the event-driven details behind a simple blocking API +that can be safely called from arbitrary goroutines. Under the covers data is +passed through channels to dedicated goroutines running separate `proton` event +loops for each connection. + +### Design of the concurrent API + +The details are still being worked out (see the code) but some basic principles have been +established. + +Code from the `proton` package runs _only_ in a dedicated goroutine (per +connection). This makes it safe to use proton C data structures associated with +that connection. + +Code in the `concurrent` package can run in any goroutine, and holds `proton` +package values with proton object pointers. To use those values, it "injects" a +function into the proton goroutine via a special channel. Injected functions +can use temporary channels to allow the calling code to wait for results. Such +waiting is only for the local event-loop, not across network calls. + +The API exposes blocking calls returning normal error values, no exposed +channels or callbacks. The user can write simple blocking code or start their +own goroutine loops and channels as appropriate. Details of our internal channel +use and error handling are hidden, which simplifies the API and gives us more +implementation flexibility. + +TODO: lifecycle rules for proton objects. ## New to Go? http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bd3fb337/proton-c/bindings/go/genwrap.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go index 27e5966..dd2780c 100644 --- a/proton-c/bindings/go/genwrap.go +++ b/proton-c/bindings/go/genwrap.go @@ -36,7 +36,7 @@ import ( ) var includeProton = "../../include/proton" -var outpath = "src/qpid.apache.org/proton/go/event/wrappers_gen.go" +var outpath = "src/qpid.apache.org/proton/wrappers_gen.go" func main() { flag.Parse() @@ -44,15 +44,15 @@ func main() { panicIf(err) defer out.Close() - apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection"} + apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection", "transport"} fmt.Fprintln(out, copyright) fmt.Fprint(out, ` -package event +package proton import ( "time" "unsafe" - "qpid.apache.org/proton/go/internal" + "qpid.apache.org/proton/internal" ) // #include <proton/types.h> @@ -122,10 +122,11 @@ func findEnums(header string) (enums []enumType) { } func genEnum(out io.Writer, name string, values []string) { - doTemplate(out, []interface{}{name, values}, `{{$enumName := index . 0}}{{$values := index . 1}} + doTemplate(out, []interface{}{name, values}, ` +{{$enumName := index . 0}}{{$values := index . 1}} type {{mixedCase $enumName}} C.pn_{{$enumName}}_t const ({{range $values}} - {{mixedCase .}} {{mixedCase $enumName}} = C.{{.}} {{end}} + {{mixedCaseTrim . "PN_"}} {{mixedCase $enumName}} = C.{{.}} {{end}} ) func (e {{mixedCase $enumName}}) String() string { @@ -177,7 +178,8 @@ under the License. */ // -// NOTE: This file was generated by genwrap.go, do not edit it by hand. +// NOTE: DO NOT EDIT. This file was generated by genwrap.go from the proton header files. +// Update the generator and re-run if you need to modify this code. // ` @@ -199,7 +201,7 @@ var ( enumDefRe = regexp.MustCompile("typedef enum {([^}]*)} pn_([a-z_]+)_t;") enumValRe = regexp.MustCompile("PN_[A-Z_]+") skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER") - skipFnRe = regexp.MustCompile("attach|context|class|collect|^recv$|^send$|transport") + skipFnRe = regexp.MustCompile("attach|context|class|collect|link_recv|link_send|transport_.*logf$|transport_.*trace|transport_head|transport_push") ) // Generate event wrappers. @@ -311,6 +313,13 @@ func mapType(ctype string) (g genType) { case "C.pn_seconds_t": g.Gotype = "time.Duration" g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Second)", v) } + case "C.pn_millis_t": + g.Gotype = "time.Duration" + g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Millisecond)", v) } + case "C.pn_timestamp_t": + g.Gotype = "time.Time" + g.ToC = func(v string) string { return fmt.Sprintf("pnTime(%s)", v) } + g.ToGo = func(v string) string { return fmt.Sprintf("goTime(%s)", v) } case "C.pn_error_t *": g.Gotype = "error" g.ToGo = func(v string) string { return fmt.Sprintf("internal.PnError(unsafe.Pointer(%s))", v) } @@ -396,7 +405,7 @@ func cAssigns(args []genArg) string { // Return the go name of the function or "" to skip the function. func goFnName(api, fname string) string { // Skip class, context and attachment functions. - if skipFnRe.FindStringSubmatch(fname) != nil { + if skipFnRe.FindStringSubmatch(api+"_"+fname) != nil { return "" } switch api + "." + fname { @@ -410,6 +419,7 @@ func goFnName(api, fname string) string { func apiWrapFns(api, header string, out io.Writer) { fmt.Fprintf(out, "type %s struct{pn *C.pn_%s_t}\n", mixedCase(api), api) fmt.Fprintf(out, "func (%c %s) IsNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0]) + fmt.Fprintf(out, "func (%c %s) CPtr() unsafe.Pointer { return unsafe.Pointer(%c.pn) }\n", api[0], mixedCase(api), api[0]) fn := regexp.MustCompile(fmt.Sprintf(`PN_EXTERN ([a-z0-9_ ]+ *\*?) *pn_%s_([a-z_]+)\(pn_%s_t *\*[a-z_]+ *,? *([^)]*)\)`, api, api)) for _, m := range fn.FindAllStringSubmatch(header, -1) { rtype, fname, argstr := mapType(m[1]), m[2], m[3] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
