PROTON-827: go binding: enable use of 'go get', reorganize packages names and layout.
Based on better understanding of Go workspaces and go get. - bindings/go directory holds source, NOT the workspace. - added go-import <meta> tags and stubs on qpid site to enable `go get qpid.apache.org/proton/go/...` - added go symlink in root of proton repo for shorter package names - renamed package imports qpid.apache.org/proton/go/... - updated README.md files - renamed branch "go" to "go1", go get will pick up this branch. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6ea3649d Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6ea3649d Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6ea3649d Branch: refs/heads/proton-go Commit: 6ea3649dfa1f2c3ce220ea46c9ff6eae4e88fa35 Parents: 333a2b2 Author: Alan Conway <[email protected]> Authored: Wed May 13 13:26:43 2015 -0400 Committer: Alan Conway <[email protected]> Committed: Mon Sep 28 14:08:22 2015 -0400 ---------------------------------------------------------------------- examples/go/README.md | 92 +++ examples/go/event/broker.go | 255 ++++++++ examples/go/example_test.go | 272 +++++++++ examples/go/receive.go | 16 +- examples/go/send.go | 10 +- go | 1 + proton-c/bindings/go/README.md | 138 +++++ proton-c/bindings/go/amqp/doc.go | 40 ++ proton-c/bindings/go/amqp/interop | 1 + proton-c/bindings/go/amqp/interop_test.go | 308 ++++++++++ proton-c/bindings/go/amqp/marshal.go | 238 ++++++++ proton-c/bindings/go/amqp/message.go | 342 +++++++++++ proton-c/bindings/go/amqp/message_test.go | 90 +++ proton-c/bindings/go/amqp/types.go | 193 ++++++ proton-c/bindings/go/amqp/uid.go | 40 ++ proton-c/bindings/go/amqp/unmarshal.go | 552 +++++++++++++++++ proton-c/bindings/go/amqp/url.go | 96 +++ proton-c/bindings/go/amqp/url_test.go | 51 ++ proton-c/bindings/go/event/doc.go | 38 ++ proton-c/bindings/go/event/handlers.go | 411 +++++++++++++ proton-c/bindings/go/event/message.go | 75 +++ proton-c/bindings/go/event/pump.go | 357 +++++++++++ proton-c/bindings/go/event/wrappers.go | 253 ++++++++ proton-c/bindings/go/event/wrappers_gen.go | 732 +++++++++++++++++++++++ proton-c/bindings/go/internal/error.go | 125 ++++ proton-c/bindings/go/messaging/doc.go | 28 + proton-c/bindings/go/messaging/handler.go | 70 +++ proton-c/bindings/go/messaging/messaging.go | 250 ++++++++ 28 files changed, 5061 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/examples/go/README.md ---------------------------------------------------------------------- diff --git a/examples/go/README.md b/examples/go/README.md new file mode 100644 index 0000000..8acef36 --- /dev/null +++ b/examples/go/README.md @@ -0,0 +1,92 @@ +# Go examples for proton + +There are 3 go packages for proton: + +- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and from Go data types. +- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for messaging clients and servers. +- qpid.apache.org/proton/go/event: full low-level access to the proton engine. + +Most applications should use the `messaging` package. The `event` package is for +applications that need low-level access to the proton engine. + +## messaging examples + +- [receive.go](receive.go) receive from many connections concurrently. +- [send.go](send.go) send to many connections concurrently. + +## event examples + +- [broker.go](event/broker.go) simple mini-broker, queues are created automatically. + +## Installing the proton Go packages + +You need to install proton in a standard place such as `/usr` or `/usr/local` so go +can find the proton C headers and libraries to build the C parts of the packages. + +You should create a go workspace and set GOPATH as described in https://golang.org/doc/code.html + +To get the proton packages into your workspace you can clone the proton repository like this: + + git clone https://git.apache.org/qpid-proton.git $GOPATH/src/qpid.apache.org/proton + +If you prefer to keep your proton clone elsewhere you can create a symlink to it in your workspace. + +You can also use `go get` as follows: + + go get qpid.apache.org/proton/go/messaging + +Once installed you can use godoc to look at docmentation on the commane line or start a +godoc web server like this: + + godoc -http=:6060 + +And look at the docs in your browser. + +Right now the layout of the documentation is a bit messed up, showing the long +path for imports, i.e. + + qpid.apache.org/proton/proton-c/bindings/go/amqp + +In your code you should use: + + qpid.apache.org/proton/go/amqp + + +## Running the examples + +You can run the examples directly from source like this: + + go run <program>.go + +This is a little slow (a couple of seconds) as it compiles the program and runs it in one step. +You can compile the program first and then run the executable to avoid the delay: + + go build <program>.go + ./<program> + +All the examples take a `-h` flag to show usage information, and the comments in +the example source have more details. + +First start the broker: + + go run event/broker.go + +Send messages concurrently to queues "foo" and "bar", 10 messages to each queue: + + go run go/send.go -count 10 localhost:/foo localhost:/bar + +Receive messages concurrently from "foo" and "bar". Note -count 20 for 10 messages each on 2 queues: + + go run go/receive.go -count 20 localhost:/foo localhost:/bar + +The broker and clients use the amqp port on the local host by default, to use a +different address use the `-addr host:port` flag. + +You can mix it up by running the Go clients with the python broker: + + python ../python/broker.py + +Or use the Go broker and the python clients: + + python ../python/simple_send.py + python ../python/simple_recv.py`. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/examples/go/event/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go new file mode 100644 index 0000000..0cb4bfa --- /dev/null +++ b/examples/go/event/broker.go @@ -0,0 +1,255 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// +// This is a simple AMQP broker implemented using the event-handler interface. +// +// It maintains a set of named in-memory queues of messages. Clients can send +// messages to queues or subscribe to receive messages from them. +// +// + +package main + +import ( + "container/list" + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "net" + "os" + "path" + "qpid.apache.org/proton/go/amqp" + "qpid.apache.org/proton/go/event" + "sync" +) + +// Command-line flags +var addr = flag.String("addr", ":amqp", "Listening address") +var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") +var full = flag.Bool("full", false, "Print full message not just body.") + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, ` +Usage: %s +A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. +`, os.Args[0]) + flag.PrintDefaults() + } + flag.Parse() + b := newBroker() + err := b.listen(*addr) + fatalIf(err) +} + +// queue is a structure representing a queue. +type queue struct { + name string // Name of queue + messages *list.List // List of event.Message + consumers map[event.Link]bool // Set of consumer links +} + +type logLink event.Link // Wrapper to print links in format for logging + +func (ll logLink) String() string { + l := event.Link(ll) + return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump()) +} + +func (q *queue) subscribe(link event.Link) { + debug.Printf("link %s subscribed to queue %s", logLink(link), q.name) + q.consumers[link] = true +} + +func (q *queue) unsubscribe(link event.Link) { + debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name) + delete(q.consumers, link) +} + +func (q *queue) empty() bool { + return len(q.consumers) == 0 && q.messages.Len() == 0 +} + +func (q *queue) push(context *event.Pump, message amqp.Message) { + q.messages.PushBack(message) + q.pop(context) +} + +func (q *queue) popTo(context *event.Pump, link event.Link) bool { + if q.messages.Len() != 0 && link.Credit() > 0 { + message := q.messages.Remove(q.messages.Front()).(amqp.Message) + debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message}) + // The first return parameter is an event.Delivery. + // The Deliver can be used to track message status, e.g. so we can re-delver on failure. + // This demo broker doesn't do that. + linkPump := link.Session().Connection().Pump() + if context == linkPump { + if context == nil { + log.Fatal("pop in nil context") + } + link.Send(message) // link is in the current pump, safe to call Send() direct + } else { + linkPump.Inject <- func() { // Inject to link's pump + link.Send(message) // FIXME aconway 2015-05-04: error handlig + } + } + return true + } + return false +} + +func (q *queue) pop(context *event.Pump) (popped bool) { + for c, _ := range q.consumers { + popped = popped || q.popTo(context, c) + } + return +} + +// broker implements event.MessagingHandler and reacts to events by moving messages on or off queues. +type broker struct { + queues map[string]*queue + lock sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming... +} + +func newBroker() *broker { + return &broker{queues: make(map[string]*queue)} +} + +func (b *broker) getQueue(name string) *queue { + q := b.queues[name] + if q == nil { + debug.Printf("Create queue %s", name) + q = &queue{name, list.New(), make(map[event.Link]bool)} + b.queues[name] = q + } + return q +} + +func (b *broker) unsubscribe(l event.Link) { + if l.IsSender() { + q := b.queues[l.RemoteSource().Address()] + if q != nil { + q.unsubscribe(l) + if q.empty() { + debug.Printf("Delete queue %s", q.name) + delete(b.queues, q.name) + } + } + } +} + +func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) error { + // FIXME aconway 2015-05-04: locking is un-golike, better example coming soon. + // Needed because the same handler is used for multiple connections concurrently + // and the queue data structures are not thread safe. + b.lock.Lock() + defer b.lock.Unlock() + + switch t { + + case event.MLinkOpening: + if e.Link().IsSender() { + q := b.getQueue(e.Link().RemoteSource().Address()) + q.subscribe(e.Link()) + } + + case event.MLinkDisconnected, event.MLinkClosing: + b.unsubscribe(e.Link()) + + case event.MSendable: + q := b.getQueue(e.Link().RemoteSource().Address()) + q.popTo(e.Connection().Pump(), e.Link()) + + case event.MMessage: + m, err := event.DecodeMessage(e) + fatalIf(err) + qname := e.Link().RemoteTarget().Address() + debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m}) + b.getQueue(qname).push(e.Connection().Pump(), m) + } + return nil +} + +func (b *broker) listen(addr string) (err error) { + // Use the standard Go "net" package to listen for connections. + listener, err := net.Listen("tcp", addr) + if err != nil { + return err + } + info.Printf("Listening on %s", listener.Addr()) + defer listener.Close() + for { + conn, err := listener.Accept() + if err != nil { + info.Printf("Accept error: %s", err) + continue + } + pump, err := event.NewPump(conn, event.NewMessagingDelegator(b)) + fatalIf(err) + info.Printf("Accepted %s[%p]", pump, pump) + pump.Server() + go func() { + pump.Run() + if pump.Error == nil { + info.Printf("Closed %s", pump) + } else { + info.Printf("Closed %s: %s", pump, pump.Error) + } + }() + } +} + +// Logging +func logger(prefix string, level int, w io.Writer) *log.Logger { + if *verbose >= level { + return log.New(w, prefix, 0) + } + return log.New(ioutil.Discard, "", 0) +} + +var info, debug *log.Logger + +func init() { + flag.Parse() + name := path.Base(os.Args[0]) + log.SetFlags(0) + log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. + info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. + debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +} + +// Simple error handling for demo. +func fatalIf(err error) { + if err != nil { + log.Fatal(err) + } +} + +type formatMessage struct{ m amqp.Message } + +func (fm formatMessage) String() string { + if *full { + return fmt.Sprintf("%#v", fm.m) + } else { + return fmt.Sprintf("%#v", fm.m.Body()) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/examples/go/example_test.go ---------------------------------------------------------------------- diff --git a/examples/go/example_test.go b/examples/go/example_test.go new file mode 100644 index 0000000..e059c28 --- /dev/null +++ b/examples/go/example_test.go @@ -0,0 +1,272 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Tests to verify that example code behaves as expected. +// Run in this directory with `go test example_test.go` +// +package main + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net" + "os" + "os/exec" + "path" + "path/filepath" + "reflect" + "testing" + "time" +) + +func panicIf(err error) { + if err != nil { + panic(err) + } +} + +// A demo broker process +type broker struct { + cmd *exec.Cmd + addr string + runerr chan error + err error +} + +// Try to connect to the broker to verify it is ready, give up after a timeout +func (b *broker) check() error { + dialer := net.Dialer{Deadline: time.Now().Add(time.Second * 10)} + for { + c, err := dialer.Dial("tcp", b.addr) + if err == nil { // Success + c.Close() + return nil + } + select { + case runerr := <-b.runerr: // Broker exited. + return runerr + default: + } + if neterr, ok := err.(net.Error); ok && neterr.Timeout() { // Running but timed out + b.stop() + return fmt.Errorf("timed out waiting for broker") + } + time.Sleep(time.Second / 10) + } +} + +// Start the demo broker, wait till it is listening on *addr. No-op if already started. +func (b *broker) start() error { + build("event/broker.go") + if b.cmd == nil { // Not already started + // FIXME aconway 2015-04-30: better way to pick/configure a broker port. + b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000) + b.cmd = exec.Command(exepath("broker"), "-addr", b.addr, "-verbose", "0") + b.runerr = make(chan error) + // Change the -verbose setting above to see broker output on stdout/stderr. + b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout + go func() { + b.runerr <- b.cmd.Run() + }() + b.err = b.check() + } + return b.err +} + +func (b *broker) stop() { + if b != nil && b.cmd != nil { + b.cmd.Process.Kill() + b.cmd.Wait() + } +} + +func checkEqual(want interface{}, got interface{}) error { + if reflect.DeepEqual(want, got) { + return nil + } + return fmt.Errorf("%#v != %#v", want, got) +} + +// runCommand returns an exec.Cmd to run an example. +func exampleCommand(prog string, arg ...string) *exec.Cmd { + build(prog + ".go") + cmd := exec.Command(exepath(prog), arg...) + cmd.Stderr = os.Stderr + return cmd +} + +// Run an example Go program, return the combined output as a string. +func runExample(prog string, arg ...string) (string, error) { + cmd := exampleCommand(prog, arg...) + out, err := cmd.Output() + return string(out), err +} + +func prefix(prefix string, err error) error { + if err != nil { + return fmt.Errorf("%s: %s", prefix, err) + } + return nil +} + +func runExampleWant(want string, prog string, args ...string) error { + out, err := runExample(prog, args...) + if err != nil { + return fmt.Errorf("%s failed: %s: %s", prog, err, out) + } + return prefix(prog, checkEqual(want, out)) +} + +func exampleArgs(args ...string) []string { + return append(args, testBroker.addr+"/foo", testBroker.addr+"/bar", testBroker.addr+"/baz") +} + +// Send then receive +func TestExampleSendReceive(t *testing.T) { + if testing.Short() { + t.Skip("Skip demo tests in short mode") + } + testBroker.start() + err := runExampleWant( + "send: Received all 15 acknowledgements\n", + "send", + exampleArgs("-count", "5", "-verbose", "1")...) + if err != nil { + t.Fatal(err) + } + err = runExampleWant( + "receive: Listening\nreceive: Received 15 messages\n", + "receive", + exampleArgs("-verbose", "1", "-count", "15")...) + if err != nil { + t.Fatal(err) + } +} + +var ready error + +func init() { ready = fmt.Errorf("Ready") } + +// Run receive in a goroutine. +// Send ready on errchan when it is listening. +// Send final error when it is done. +// Returns the Cmd, caller must Wait() +func goReceiveWant(errchan chan<- error, want string, arg ...string) *exec.Cmd { + cmd := exampleCommand("receive", arg...) + go func() { + pipe, err := cmd.StdoutPipe() + if err != nil { + errchan <- err + return + } + out := bufio.NewReader(pipe) + cmd.Start() + line, err := out.ReadString('\n') + if err != nil && err != io.EOF { + errchan <- err + return + } + listening := "receive: Listening\n" + if line != listening { + errchan <- checkEqual(listening, line) + return + } + errchan <- ready + buf := bytes.Buffer{} + io.Copy(&buf, out) // Collect the rest of the output + errchan <- checkEqual(want, buf.String()) + close(errchan) + }() + return cmd +} + +// Start receiver first, wait till it is running, then send. +func TestExampleReceiveSend(t *testing.T) { + if testing.Short() { + t.Skip("Skip demo tests in short mode") + } + testBroker.start() + recvErr := make(chan error) + recvCmd := goReceiveWant(recvErr, + "receive: Received 15 messages\n", + exampleArgs("-count", "15", "-verbose", "1")...) + defer func() { + recvCmd.Process.Kill() + recvCmd.Wait() + }() + if err := <-recvErr; err != ready { // Wait for receiver ready + t.Fatal(err) + } + err := runExampleWant( + "send: Received all 15 acknowledgements\n", + "send", + exampleArgs("-count", "5", "-verbose", "1")...) + if err != nil { + t.Fatal(err) + } + if err := <-recvErr; err != nil { + t.Fatal(err) + } +} + +func exepath(relative string) string { + if binDir == "" { + panic("bindir not set, cannot run example binaries") + } + return path.Join(binDir, relative) +} + +var testBroker *broker +var binDir, exampleDir string +var built map[string]bool + +func init() { + built = make(map[string]bool) +} + +func build(prog string) { + if !built[prog] { + build := exec.Command("go", "build", path.Join(exampleDir, prog)) + build.Dir = binDir + out, err := build.CombinedOutput() + if err != nil { + panic(fmt.Errorf("%v: %s", err, out)) + } + built[prog] = true + } +} + +func TestMain(m *testing.M) { + rand.Seed(time.Now().UTC().UnixNano()) + var err error + exampleDir, err = filepath.Abs(".") + panicIf(err) + binDir, err = ioutil.TempDir("", "example_test.go") + panicIf(err) + defer os.Remove(binDir) // Clean up binaries + testBroker = &broker{} // Broker is started on-demand by tests. + testBroker.stop() + status := m.Run() + testBroker.stop() + os.Exit(status) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/examples/go/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/receive.go b/examples/go/receive.go index 2545eab..e31862b 100644 --- a/examples/go/receive.go +++ b/examples/go/receive.go @@ -29,8 +29,8 @@ import ( "net" "os" "path" - "qpid.apache.org/proton" - "qpid.apache.org/proton/messaging" + "qpid.apache.org/proton/go/amqp" + "qpid.apache.org/proton/go/messaging" "sync" "time" ) @@ -68,8 +68,8 @@ Receive messages from all the listed URLs concurrently and print them. // Create a goroutine for each URL that receives messages and sends them to // the messages channel. main() receives and prints them. - messages := make(chan proton.Message) // Channel for messages from goroutines to main() - stop := make(chan struct{}) // Closing this channel means the program is stopping. + messages := make(chan amqp.Message) // Channel for messages from goroutines to main() + stop := make(chan struct{}) // Closing this channel means the program is stopping. var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. @@ -88,8 +88,8 @@ Receive messages from all the listed URLs concurrently and print them. for i, urlStr := range urls { debug.Printf("Connecting to %s", urlStr) go func(urlStr string) { - defer wait.Done() // Notify main() that this goroutine is done. - url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. + defer wait.Done() // Notify main() that this goroutine is done. + url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. fatalIf(err) // Open a standard Go net.Conn and and AMQP connection using it. @@ -104,7 +104,7 @@ Receive messages from all the listed URLs concurrently and print them. fatalIf(err) for { - var m proton.Message + var m amqp.Message select { // Receive a message or stop. case m = <-r.Receive: case <-stop: // The program is stopping. @@ -163,7 +163,7 @@ func fatalIf(err error) { } } -type formatMessage struct{ m proton.Message } +type formatMessage struct{ m amqp.Message } func (fm formatMessage) String() string { if *full { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/examples/go/send.go ---------------------------------------------------------------------- diff --git a/examples/go/send.go b/examples/go/send.go index c4db7cd..4aaeb43 100644 --- a/examples/go/send.go +++ b/examples/go/send.go @@ -29,8 +29,8 @@ import ( "net" "os" "path" - "qpid.apache.org/proton" - "qpid.apache.org/proton/messaging" + "qpid.apache.org/proton/go/amqp" + "qpid.apache.org/proton/go/messaging" "sync" ) @@ -85,8 +85,8 @@ To each URL, send the string "path-n" where n is the message number. for i, urlStr := range urls { debug.Printf("Connecting to %v", urlStr) go func(urlStr string) { - defer wait.Done() // Notify main() that this goroutine is done. - url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. + defer wait.Done() // Notify main() that this goroutine is done. + url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. fatalIf(err) // Open a standard Go net.Conn and and AMQP connection using it. @@ -101,7 +101,7 @@ To each URL, send the string "path-n" where n is the message number. fatalIf(err) for i := int64(0); i < *count; i++ { - m := proton.NewMessage() + m := amqp.NewMessage() body := fmt.Sprintf("%v-%v", url.Path, i) m.SetBody(body) ack, err := s.Send(m) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/go ---------------------------------------------------------------------- diff --git a/go b/go new file mode 120000 index 0000000..a4b737d --- /dev/null +++ b/go @@ -0,0 +1 @@ +proton-c/bindings/go \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/README.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md new file mode 100644 index 0000000..0449d68 --- /dev/null +++ b/proton-c/bindings/go/README.md @@ -0,0 +1,138 @@ +# *EXPERIMENTAL* Go binding for proton + +This is the beginning of a [Go](http://golang.org) binding for proton. + +This work is in early *experimental* stages, *everything* may change in future. +Comments and contributions are strongly encouraged, this experiment is public so +early feedback can guide development. + +- Email <[email protected]> +- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue. + +There are working [examples](../../../examples/go/README.md) and the examples README file +explains how to install the packages in your go workspace and read the documentation. + +## Goals + +The API should + +- be idiomatic, unsurprising, and easy to use for Go developers. +- support client and server development. +- make simple tasks simple. +- provide deep access to AMQP protocol when that is required. + +There are two types of developer we want to support + +1. Go developers using AMQP as a message transport: + - Straightforward conversions between Go built-in types and AMQP types. + - Easy message exchange via Go channels to support use in goroutines. + +2. AMQP-aware developers using Go as an implementation language: + - Go types to exactly represent all AMQP types and encoding details. + - Full access to detailed AMQP concepts: sessions, links, deliveries etc. + +## Status + +There are 3 go packages for proton: + +- qpid.apache.org/proton/go/amqp: converts AMQP messages and data types to and from Go data types. +- qpid.apache.org/proton/go/messaging: easy-to-use, concurrent API for messaging clients and servers. +- qpid.apache.org/proton/go/event: full low-level access to the proton engine. + +Most applications should use the `messaging` package. The `event` package is for +applications that need low-level access to the proton engine. + +The `event` package is fairly complete, with the exception of the proton +reactor. It's unclear if the reactor is important for go. + +The `messaging` package can run the examples but probably not much else. There +is work to do on error handling and the API may change. + +There are working [examples](../../../examples/go/README.md) of a broker using `event` and +a sender and receiver using `messaging`. + +## The event driven API + +See the package documentation for details. + +## The Go API + +The goal: A procedural API that allows any user goroutine to send and receive +AMQP messages and other information (acknowledgments, flow control instructions +etc.) using channels. There will be no user-visible locks and no need to run +user code in special goroutines, e.g. as handlers in a proton event loop. + +See the package documentation for emerging details. + +Currently using a channel to receive messages, a function to send them (channels +internally) and a channel as a "future" for acknowledgements to senders. This +may change. + +## Why a separate API for Go? + +Go is a concurrent language and encourages applications to be divided into +concurrent *goroutines*. It provides traditional locking but it encourages the +use *channels* to communicate between goroutines without explicit locks: + + "Share memory by communicating, don't communicate by sharing memory" + +The idea is that a given value is only operated on by one goroutine at a time, +but values can easily be passed from one goroutine to another. This removes much +of the need for locking. + +Go literature distinguishes between: + +- *concurrency*: "keeping track of things that could be done in parallel" +- *parallelism*: "actually doing things in parallel" + +The application expresses concurrency by starting goroutines for potentially +concurrent tasks. The Go run-times schedule the activity of goroutines onto a +small number (possibly one) of actual parallel executions. + +Even with *no* parallelism, concurrency lets the Go run-times *order* work with +respect to events like file descriptors being readable/writable, channels having +data, timers firing etc. Go automatically takes care of switching out goroutines +that block or sleep so it is normal to write code in terms of blocking calls. + +Event-driven API (like poll, epoll, select or the proton event API) also +channel unpredictably ordered events to actions in one or a small pool of +execution threads. However this requires a different style of programming: +"event-driven" or "reactive" programming. Go developers call it "inside-out" +programming. In an event-driven architecture blocking is a big problem as it +consumes a scarce thread of execution, so actions that take time to complete +have to be re-structured in terms of future event delivery. + +The promise of Go is that you can express your application in concurrent, +procedural terms with simple blocking calls and the Go run-times will turn it +inside-out for you. Write as many goroutines as you want, and let Go interleave +and schedule them efficiently. + +For example: the Go equivalent of listening for connections is a goroutine with +a simple endless loop that calls a blocking Listen() function and starts a +goroutine for each new connection. Each connection has its own goroutine that +deals with just that connection till it closes. + +The benefit is that the variables and logic live closer together. Once you're in +a goroutine, you have everything you need in local variables, and they are +preserved across blocking calls. There's no need to store details in context +objects that you have to look up when handling a later event to figure out how +to continue where you left off. + +So a Go-like proton API does not force the users code to run in an event-loop +goroutine. Instead user goroutines communicate with the event loop(s) via +channels. There's no need to funnel connections into one event loop, in fact it +makes no sense. Connections can be processed concurrently so they should be +processed in separate goroutines and left to Go to schedule. User goroutines can +have simple loops that block channels till messages are available, the user can +start as many or as few such goroutines as they wish to implement concurrency as +simple or as complex as they wish. For example blocking request-response +vs. asynchronous flows of messages and acknowledgments. + +## New to Go? + +If you are new to Go then these are a good place to start: + +- [A Tour of Go](http://tour.golang.org) +- [Effective Go](http://golang.org/doc/effective_go.html) + +Then look at the tools and library docs at <http://golang.org> as you need them. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/doc.go b/proton-c/bindings/go/amqp/doc.go new file mode 100644 index 0000000..7c00aa0 --- /dev/null +++ b/proton-c/bindings/go/amqp/doc.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +/* +Package amqp encodes and decodes AMQP messages and data as Go types. + +It follows the standard 'encoding' libraries pattern. The mapping between AMQP +and Go types is described in the documentation of the Marshal and Unmarshal +functions. + +The sub-packages 'event' and 'messaging' provide two alternative ways to write +AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' +gives complete low-level control of the underlying proton C engine. + +AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> +*/ +package amqp + +// #cgo LDFLAGS: -lqpid-proton +import "C" + +// This file is just for the package comment. + +// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/interop ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/interop b/proton-c/bindings/go/amqp/interop new file mode 120000 index 0000000..8f50d0e --- /dev/null +++ b/proton-c/bindings/go/amqp/interop @@ -0,0 +1 @@ +../../../../tests/interop \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/interop_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/interop_test.go b/proton-c/bindings/go/amqp/interop_test.go new file mode 100644 index 0000000..11049f7 --- /dev/null +++ b/proton-c/bindings/go/amqp/interop_test.go @@ -0,0 +1,308 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Test that conversion of Go type to/from AMQP is compatible with other +// bindings. +// +package amqp + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + "strings" + "testing" +) + +func assertEqual(want interface{}, got interface{}) { + if !reflect.DeepEqual(want, got) { + panic(fmt.Errorf("%#v != %#v", want, got)) + } +} + +func assertNil(err interface{}) { + if err != nil { + panic(err) + } +} + +func getReader(name string) (r io.Reader) { + r, err := os.Open("interop/" + name + ".amqp") + if err != nil { + panic(fmt.Errorf("Can't open %#v: %v", name, err)) + } + return +} + +func remaining(d *Decoder) string { + remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader)) + return string(remainder) +} + +// assertDecode: want is the expected value, gotPtr is a pointer to a +// instance of the same type for Decode. +func assertDecode(d *Decoder, want interface{}, gotPtr interface{}) { + + assertNil(d.Decode(gotPtr)) + + got := reflect.ValueOf(gotPtr).Elem().Interface() + assertEqual(want, got) + + // Try round trip encoding + bytes, err := Marshal(want, nil) + assertNil(err) + n, err := Unmarshal(bytes, gotPtr) + assertNil(err) + assertEqual(n, len(bytes)) + got = reflect.ValueOf(gotPtr).Elem().Interface() + assertEqual(want, got) +} + +func TestUnmarshal(t *testing.T) { + bytes, err := ioutil.ReadAll(getReader("strings")) + if err != nil { + t.Error(err) + } + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + n, err := Unmarshal(bytes, &got) + if err != nil { + t.Error(err) + } + if want != got { + t.Errorf("%#v != %#v", want, got) + } + bytes = bytes[n:] + } +} + +func TestPrimitivesExact(t *testing.T) { + d := NewDecoder(getReader("primitives")) + // Decoding into exact types + var b bool + assertDecode(d, true, &b) + assertDecode(d, false, &b) + var u8 uint8 + assertDecode(d, uint8(42), &u8) + var u16 uint16 + assertDecode(d, uint16(42), &u16) + var i16 int16 + assertDecode(d, int16(-42), &i16) + var u32 uint32 + assertDecode(d, uint32(12345), &u32) + var i32 int32 + assertDecode(d, int32(-12345), &i32) + var u64 uint64 + assertDecode(d, uint64(12345), &u64) + var i64 int64 + assertDecode(d, int64(-12345), &i64) + var f32 float32 + assertDecode(d, float32(0.125), &f32) + var f64 float64 + assertDecode(d, float64(0.125), &f64) +} + +func TestPrimitivesCompatible(t *testing.T) { + d := NewDecoder(getReader("primitives")) + // Decoding into compatible types + var b bool + var i int + var u uint + var f float64 + assertDecode(d, true, &b) + assertDecode(d, false, &b) + assertDecode(d, uint(42), &u) + assertDecode(d, uint(42), &u) + assertDecode(d, -42, &i) + assertDecode(d, uint(12345), &u) + assertDecode(d, -12345, &i) + assertDecode(d, uint(12345), &u) + assertDecode(d, -12345, &i) + assertDecode(d, 0.125, &f) + assertDecode(d, 0.125, &f) +} + +// assertDecodeValue: want is the expected value, decode into a reflect.Value +func assertDecodeInterface(d *Decoder, want interface{}) { + + var got, got2 interface{} + assertNil(d.Decode(&got)) + + assertEqual(want, got) + + // Try round trip encoding + bytes, err := Marshal(got, nil) + assertNil(err) + n, err := Unmarshal(bytes, &got2) + assertNil(err) + assertEqual(n, len(bytes)) + assertEqual(want, got2) +} + +func TestPrimitivesInterface(t *testing.T) { + d := NewDecoder(getReader("primitives")) + assertDecodeInterface(d, true) + assertDecodeInterface(d, false) + assertDecodeInterface(d, uint8(42)) + assertDecodeInterface(d, uint16(42)) + assertDecodeInterface(d, int16(-42)) + assertDecodeInterface(d, uint32(12345)) + assertDecodeInterface(d, int32(-12345)) + assertDecodeInterface(d, uint64(12345)) + assertDecodeInterface(d, int64(-12345)) + assertDecodeInterface(d, float32(0.125)) + assertDecodeInterface(d, float64(0.125)) +} + +func TestStrings(t *testing.T) { + d := NewDecoder(getReader("strings")) + // Test decoding as plain Go strings + for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} { + var got string + assertDecode(d, want, &got) + } + remains := remaining(d) + if remains != "" { + t.Errorf("leftover: %s", remains) + } + + // Test decoding as specific string types + d = NewDecoder(getReader("strings")) + var bytes []byte + var str, sym string + assertDecode(d, []byte("abc\000defg"), &bytes) + assertDecode(d, "abcdefg", &str) + assertDecode(d, "abcdefg", &sym) + assertDecode(d, make([]byte, 0), &bytes) + assertDecode(d, "", &str) + assertDecode(d, "", &sym) + remains = remaining(d) + if remains != "" { + t.Fatalf("leftover: %s", remains) + } + + // Test some error handling + d = NewDecoder(getReader("strings")) + var s string + err := d.Decode(s) + if err == nil { + t.Fatal("Expected error") + } + if !strings.Contains(err.Error(), "not a pointer") { + t.Error(err) + } + var i int + err = d.Decode(&i) + if !strings.Contains(err.Error(), "cannot unmarshal") { + t.Error(err) + } + _, err = Unmarshal([]byte{}, nil) + if !strings.Contains(err.Error(), "not enough data") { + t.Error(err) + } + _, err = Unmarshal([]byte("foobar"), nil) + if !strings.Contains(err.Error(), "invalid-argument") { + t.Error(err) + } +} + +func TestEncodeDecode(t *testing.T) { + type data struct { + s string + i int + u8 uint8 + b bool + f float32 + v interface{} + } + + in := data{"foo", 42, 9, true, 1.234, "thing"} + + buf := bytes.Buffer{} + e := NewEncoder(&buf) + assertNil(e.Encode(in.s)) + assertNil(e.Encode(in.i)) + assertNil(e.Encode(in.u8)) + assertNil(e.Encode(in.b)) + assertNil(e.Encode(in.f)) + assertNil(e.Encode(in.v)) + + var out data + d := NewDecoder(&buf) + assertNil(d.Decode(&out.s)) + assertNil(d.Decode(&out.i)) + assertNil(d.Decode(&out.u8)) + assertNil(d.Decode(&out.b)) + assertNil(d.Decode(&out.f)) + assertNil(d.Decode(&out.v)) + + assertEqual(in, out) +} + +func TestMap(t *testing.T) { + d := NewDecoder(getReader("maps")) + + // Generic map + var m Map + assertDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m) + + // Interface as map + var i interface{} + assertDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i) + + d = NewDecoder(getReader("maps")) + // Specific typed map + var m2 map[string]int + assertDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2) + + // Round trip a nested map + m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}} + bytes, err := Marshal(m, nil) + assertNil(err) + _, err = Unmarshal(bytes, &i) + assertNil(err) + assertEqual(m, i) +} + +func TestList(t *testing.T) { + d := NewDecoder(getReader("lists")) + var l List + assertDecode(d, List{int32(32), "foo", true}, &l) + assertDecode(d, List{}, &l) +} + +func FIXMETestMessage(t *testing.T) { + // FIXME aconway 2015-04-09: integrate Message encoding under marshal/unmarshal API. + bytes, err := ioutil.ReadAll(getReader("message")) + assertNil(err) + m, err := DecodeMessage(bytes) + assertNil(err) + fmt.Printf("%+v\n", m) + assertEqual(m.Body(), "hello") + + bytes2 := make([]byte, len(bytes)) + bytes2, err = m.Encode(bytes2) + assertNil(err) + assertEqual(bytes, bytes2) +} + +// FIXME aconway 2015-03-13: finish the full interop test http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/marshal.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/marshal.go b/proton-c/bindings/go/amqp/marshal.go new file mode 100644 index 0000000..e5c2945 --- /dev/null +++ b/proton-c/bindings/go/amqp/marshal.go @@ -0,0 +1,238 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +import "C" + +import ( + "io" + "qpid.apache.org/proton/go/internal" + "reflect" + "unsafe" +) + +func dataError(prefix string, data *C.pn_data_t) error { + err := internal.PnError(unsafe.Pointer(C.pn_data_error(data))) + if err != nil { + err = internal.Errorf("%s: %s", prefix, err.(internal.Error)) + } + return err +} + +/* +Marshal encodes a Go value as AMQP data in buffer. +If buffer is nil, or is not large enough, a new buffer is created. + +Returns the buffer used for encoding with len() adjusted to the actual size of data. + +Go types are encoded as follows + + +-------------------------------------+--------------------------------------------+ + |Go type |AMQP type | + +-------------------------------------+--------------------------------------------+ + |bool |bool | + +-------------------------------------+--------------------------------------------+ + |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) | + +-------------------------------------+--------------------------------------------+ + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) | + +-------------------------------------+--------------------------------------------+ + |float32, float64 |float, double. | + +-------------------------------------+--------------------------------------------+ + |string |string | + +-------------------------------------+--------------------------------------------+ + |[]byte, Binary |binary | + +-------------------------------------+--------------------------------------------+ + |Symbol |symbol | + +-------------------------------------+--------------------------------------------+ + |interface{} |the contained type | + +-------------------------------------+--------------------------------------------+ + |nil |null | + +-------------------------------------+--------------------------------------------+ + |map[K]T |map with K and T converted as above | + +-------------------------------------+--------------------------------------------+ + |Map |map, may have mixed types for keys, values | + +-------------------------------------+--------------------------------------------+ + |[]T |list with T converted as above | + +-------------------------------------+--------------------------------------------+ + |List |list, may have mixed types values | + +-------------------------------------+--------------------------------------------+ + +TODO Go types: array, slice, struct + +Go types that cannot be marshaled: complex64/128, uintptr, function, interface, channel +*/ +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) { + defer internal.DoRecover(&err) + data := C.pn_data(0) + defer C.pn_data_free(data) + put(data, v) + encode := func(buf []byte) ([]byte, error) { + n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf))) + switch { + case n == int(C.PN_OVERFLOW): + return buf, overflow + case n < 0: + return buf, dataError("marshal error", data) + default: + return buf[:n], nil + } + } + return encodeGrow(buffer, encode) +} + +const minEncode = 256 + +// overflow is returned when an encoding function can't fit data in the buffer. +var overflow = internal.Errorf("buffer too small") + +// encodeFn encodes into buffer[0:len(buffer)]. +// Returns buffer with length adjusted for data encoded. +// If buffer too small, returns overflow as error. +type encodeFn func(buffer []byte) ([]byte, error) + +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer. +// Returns the final buffer. +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) { + if buffer == nil || len(buffer) == 0 { + buffer = make([]byte, minEncode) + } + var err error + for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) { + buffer = make([]byte, 2*len(buffer)) + } + return buffer, err +} + +func put(data *C.pn_data_t, v interface{}) { + switch v := v.(type) { + case nil: + C.pn_data_put_null(data) + case bool: + C.pn_data_put_bool(data, C.bool(v)) + case int8: + C.pn_data_put_byte(data, C.int8_t(v)) + case int16: + C.pn_data_put_short(data, C.int16_t(v)) + case int32: + C.pn_data_put_int(data, C.int32_t(v)) + case int64: + C.pn_data_put_long(data, C.int64_t(v)) + case int: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_long(data, C.int64_t(v)) + } else { + C.pn_data_put_int(data, C.int32_t(v)) + } + case uint8: + C.pn_data_put_ubyte(data, C.uint8_t(v)) + case uint16: + C.pn_data_put_ushort(data, C.uint16_t(v)) + case uint32: + C.pn_data_put_uint(data, C.uint32_t(v)) + case uint64: + C.pn_data_put_ulong(data, C.uint64_t(v)) + case uint: + if unsafe.Sizeof(0) == 8 { + C.pn_data_put_ulong(data, C.uint64_t(v)) + } else { + C.pn_data_put_uint(data, C.uint32_t(v)) + } + case float32: + C.pn_data_put_float(data, C.float(v)) + case float64: + C.pn_data_put_double(data, C.double(v)) + case string: + C.pn_data_put_string(data, pnBytes([]byte(v))) + case []byte: + C.pn_data_put_binary(data, pnBytes(v)) + case Binary: + C.pn_data_put_binary(data, pnBytes([]byte(v))) + case Symbol: + C.pn_data_put_symbol(data, pnBytes([]byte(v))) + case Map: // Special map type + C.pn_data_put_map(data) + C.pn_data_enter(data) + for key, val := range v { + put(data, key) + put(data, val) + } + C.pn_data_exit(data) + default: + switch reflect.TypeOf(v).Kind() { + case reflect.Map: + putMap(data, v) + case reflect.Slice: + putList(data, v) + default: + panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v))) + } + } + err := dataError("marshal", data) + if err != nil { + panic(err) + } + return +} + +func putMap(data *C.pn_data_t, v interface{}) { + mapValue := reflect.ValueOf(v) + C.pn_data_put_map(data) + C.pn_data_enter(data) + for _, key := range mapValue.MapKeys() { + put(data, key.Interface()) + put(data, mapValue.MapIndex(key).Interface()) + } + C.pn_data_exit(data) +} + +func putList(data *C.pn_data_t, v interface{}) { + listValue := reflect.ValueOf(v) + C.pn_data_put_list(data) + C.pn_data_enter(data) + for i := 0; i < listValue.Len(); i++ { + put(data, listValue.Index(i).Interface()) + } + C.pn_data_exit(data) +} + +// Encoder encodes AMQP values to an io.Writer +type Encoder struct { + writer io.Writer + buffer []byte +} + +// New encoder returns a new encoder that writes to w. +func NewEncoder(w io.Writer) *Encoder { + return &Encoder{w, make([]byte, minEncode)} +} + +func (e *Encoder) Encode(v interface{}) (err error) { + e.buffer, err = Marshal(v, e.buffer) + if err == nil { + e.writer.Write(e.buffer) + } + return err +} + +func replace(data *C.pn_data_t, v interface{}) { + C.pn_data_clear(data) + put(data, v) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/message.go b/proton-c/bindings/go/amqp/message.go new file mode 100644 index 0000000..87093f5 --- /dev/null +++ b/proton-c/bindings/go/amqp/message.go @@ -0,0 +1,342 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/types.h> +// #include <proton/message.h> +// #include <proton/codec.h> +import "C" + +import ( + "qpid.apache.org/proton/go/internal" + "time" + "unsafe" +) + +// FIXME aconway 2015-04-28: Do we need the interface or can we just export the struct? + +// Message is the interface to an AMQP message. +// Instances of this interface contain a pointer to the underlying struct. +type Message interface { + /** + * Inferred indicates how the message content + * is encoded into AMQP sections. If inferred is true then binary and + * list values in the body of the message will be encoded as AMQP DATA + * and AMQP SEQUENCE sections, respectively. If inferred is false, + * then all values in the body of the message will be encoded as AMQP + * VALUE sections regardless of their type. + */ + Inferred() bool + SetInferred(bool) + + /** + * Durable indicates that any parties taking responsibility + * for the message must durably store the content. + */ + Durable() bool + SetDurable(bool) + + /** + * Priority impacts ordering guarantees. Within a + * given ordered context, higher priority messages may jump ahead of + * lower priority messages. + */ + Priority() uint8 + SetPriority(uint8) + + /** + * TTL or Time To Live, a message it may be dropped after this duration + */ + TTL() time.Duration + SetTTL(time.Duration) + + /** + * FirstAcquirer indicates + * that the recipient of the message is the first recipient to acquire + * the message, i.e. there have been no failed delivery attempts to + * other acquirers. Note that this does not mean the message has not + * been delivered to, but not acquired, by other recipients. + */ + FirstAcquirer() bool + SetFirstAcquirer(bool) + + /** + * DeliveryCount tracks how many attempts have been made to + * delivery a message. + */ + DeliveryCount() uint32 + SetDeliveryCount(uint32) + + /** + * MessageId provides a unique identifier for a message. + * it can be an a string, an unsigned long, a uuid or a + * binary value. + */ + MessageId() interface{} + SetMessageId(interface{}) + + UserId() string + SetUserId(string) + + Address() string + SetAddress(string) + + Subject() string + SetSubject(string) + + ReplyTo() string + SetReplyTo(string) + + /** + * CorrelationId is set on correlated request and response messages. It can be an a string, an unsigned long, a uuid or a + * binary value. + */ + CorrelationId() interface{} + SetCorrelationId(interface{}) + + ContentType() string + SetContentType(string) + + ContentEncoding() string + SetContentEncoding(string) + + // ExpiryTime indicates an absoulte time when the message may be dropped. + // A Zero time (i.e. t.isZero() == true) indicates a message never expires. + ExpiryTime() time.Time + SetExpiryTime(time.Time) + + CreationTime() time.Time + SetCreationTime(time.Time) + + GroupId() string + SetGroupId(string) + + GroupSequence() int32 + SetGroupSequence(int32) + + ReplyToGroupId() string + SetReplyToGroupId(string) + + /** + * Instructions can be used to access or modify AMQP delivery instructions. + */ + Instructions() *map[string]interface{} + + /** + * Annotations can be used to access or modify AMQP annotations. + */ + Annotations() *map[string]interface{} + + /** + * Properties can be used to access or modify the application properties of a message. + */ + Properties() *map[string]interface{} + + /** + * Body of the message can be any AMQP encodable type. + */ + Body() interface{} + SetBody(interface{}) + + // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough + // the message is encoded into it, otherwise a new buffer is created. + // Returns the buffer containing the message. + Encode(buffer []byte) ([]byte, error) +} + +// NewMessage creates a new message instance. The returned interface contains a pointer. +func NewMessage() Message { + pn := C.pn_message() // Pick up default setting from C message. + defer C.pn_message_free(pn) + return goMessage(pn) +} + +// Message implementation copies all message data into Go space so it can be proprely +// memory managed. +// +type message struct { + inferred, durable, firstAcquirer bool + priority uint8 + ttl time.Duration + deliveryCount uint32 + messageId interface{} + userId, address, subject, replyTo string + contentType, contentEncoding string + groupId, replyToGroupId string + creationTime, expiryTime time.Time + groupSequence int32 + correlationId interface{} + instructions, annotations, properties map[string]interface{} + body interface{} +} + +func (m *message) Inferred() bool { return m.inferred } +func (m *message) SetInferred(b bool) { m.inferred = b } +func (m *message) Durable() bool { return m.durable } +func (m *message) SetDurable(b bool) { m.durable = b } +func (m *message) Priority() uint8 { return m.priority } +func (m *message) SetPriority(b uint8) { m.priority = b } +func (m *message) TTL() time.Duration { return m.ttl } +func (m *message) SetTTL(d time.Duration) { m.ttl = d } +func (m *message) FirstAcquirer() bool { return m.firstAcquirer } +func (m *message) SetFirstAcquirer(b bool) { m.firstAcquirer = b } +func (m *message) DeliveryCount() uint32 { return m.deliveryCount } +func (m *message) SetDeliveryCount(c uint32) { m.deliveryCount = c } +func (m *message) MessageId() interface{} { return m.messageId } +func (m *message) SetMessageId(id interface{}) { m.messageId = id } +func (m *message) UserId() string { return m.userId } +func (m *message) SetUserId(s string) { m.userId = s } +func (m *message) Address() string { return m.address } +func (m *message) SetAddress(s string) { m.address = s } +func (m *message) Subject() string { return m.subject } +func (m *message) SetSubject(s string) { m.subject = s } +func (m *message) ReplyTo() string { return m.replyTo } +func (m *message) SetReplyTo(s string) { m.replyTo = s } +func (m *message) CorrelationId() interface{} { return m.correlationId } +func (m *message) SetCorrelationId(c interface{}) { m.correlationId = c } +func (m *message) ContentType() string { return m.contentType } +func (m *message) SetContentType(s string) { m.contentType = s } +func (m *message) ContentEncoding() string { return m.contentEncoding } +func (m *message) SetContentEncoding(s string) { m.contentEncoding = s } +func (m *message) ExpiryTime() time.Time { return m.expiryTime } +func (m *message) SetExpiryTime(t time.Time) { m.expiryTime = t } +func (m *message) CreationTime() time.Time { return m.creationTime } +func (m *message) SetCreationTime(t time.Time) { m.creationTime = t } +func (m *message) GroupId() string { return m.groupId } +func (m *message) SetGroupId(s string) { m.groupId = s } +func (m *message) GroupSequence() int32 { return m.groupSequence } +func (m *message) SetGroupSequence(s int32) { m.groupSequence = s } +func (m *message) ReplyToGroupId() string { return m.replyToGroupId } +func (m *message) SetReplyToGroupId(s string) { m.replyToGroupId = s } +func (m *message) Instructions() *map[string]interface{} { return &m.instructions } +func (m *message) Annotations() *map[string]interface{} { return &m.annotations } +func (m *message) Properties() *map[string]interface{} { return &m.properties } +func (m *message) Body() interface{} { return m.body } +func (m *message) SetBody(b interface{}) { m.body = b } + +// rewindGet rewinds and then gets the value from a data object. +func rewindGet(data *C.pn_data_t, v interface{}) { + if data != nil && C.pn_data_size(data) > 0 { + C.pn_data_rewind(data) + C.pn_data_next(data) + get(data, v) + } +} + +// goMessage populates a Go message from a pn_message_t +func goMessage(pn *C.pn_message_t) *message { + m := &message{ + inferred: bool(C.pn_message_is_inferred(pn)), + durable: bool(C.pn_message_is_durable(pn)), + priority: uint8(C.pn_message_get_priority(pn)), + ttl: time.Duration(C.pn_message_get_ttl(pn)) * time.Millisecond, + firstAcquirer: bool(C.pn_message_is_first_acquirer(pn)), + deliveryCount: uint32(C.pn_message_get_delivery_count(pn)), + userId: goString(C.pn_message_get_user_id(pn)), + address: C.GoString(C.pn_message_get_address(pn)), + subject: C.GoString(C.pn_message_get_subject(pn)), + replyTo: C.GoString(C.pn_message_get_reply_to(pn)), + contentType: C.GoString(C.pn_message_get_content_type(pn)), + contentEncoding: C.GoString(C.pn_message_get_content_encoding(pn)), + expiryTime: time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(pn)))), + creationTime: time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(pn))), + groupId: C.GoString(C.pn_message_get_group_id(pn)), + groupSequence: int32(C.pn_message_get_group_sequence(pn)), + replyToGroupId: C.GoString(C.pn_message_get_reply_to_group_id(pn)), + messageId: nil, + correlationId: nil, + instructions: make(map[string]interface{}), + annotations: make(map[string]interface{}), + properties: make(map[string]interface{}), + } + rewindGet(C.pn_message_id(pn), &m.messageId) + rewindGet(C.pn_message_correlation_id(pn), &m.correlationId) + rewindGet(C.pn_message_instructions(pn), &m.instructions) + rewindGet(C.pn_message_annotations(pn), &m.annotations) + rewindGet(C.pn_message_properties(pn), &m.properties) + rewindGet(C.pn_message_body(pn), &m.body) + return m +} + +// pnMessage populates a pn_message_t from a Go message. +func (m *message) pnMessage() *C.pn_message_t { + pn := C.pn_message() + C.pn_message_set_inferred(pn, C.bool(m.Inferred())) + C.pn_message_set_durable(pn, C.bool(m.Durable())) + C.pn_message_set_priority(pn, C.uint8_t(m.priority)) + C.pn_message_set_ttl(pn, C.pn_millis_t(m.TTL()/time.Millisecond)) + C.pn_message_set_first_acquirer(pn, C.bool(m.FirstAcquirer())) + C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount)) + replace(C.pn_message_id(pn), m.MessageId()) + C.pn_message_set_user_id(pn, pnBytes([]byte(m.UserId()))) + C.pn_message_set_address(pn, C.CString(m.Address())) + C.pn_message_set_subject(pn, C.CString(m.Subject())) + C.pn_message_set_reply_to(pn, C.CString(m.ReplyTo())) + replace(C.pn_message_correlation_id(pn), m.CorrelationId()) + C.pn_message_set_content_type(pn, C.CString(m.ContentType())) + C.pn_message_set_content_encoding(pn, C.CString(m.ContentEncoding())) + C.pn_message_set_expiry_time(pn, pnTime(m.ExpiryTime())) + C.pn_message_set_creation_time(pn, pnTime(m.CreationTime())) + C.pn_message_set_group_id(pn, C.CString(m.GroupId())) + C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.GroupSequence())) + C.pn_message_set_reply_to_group_id(pn, C.CString(m.ReplyToGroupId())) + replace(C.pn_message_instructions(pn), *m.Instructions()) + replace(C.pn_message_annotations(pn), *m.Annotations()) + replace(C.pn_message_properties(pn), *m.Properties()) + replace(C.pn_message_body(pn), m.Body()) + return pn +} + +// FIXME aconway 2015-04-08: Move message encode/decode under Marshal/Unmarshal interfaces. + +// DecodeMessage decodes bytes as a message +func DecodeMessage(data []byte) (Message, error) { + pnMsg := C.pn_message() + defer C.pn_message_free(pnMsg) + if len(data) == 0 { + return nil, internal.Errorf("empty buffer for decode") + } + if C.pn_message_decode(pnMsg, cPtr(data), cLen(data)) < 0 { + return nil, internal.Errorf("decoding message: %s", + internal.PnError(unsafe.Pointer(C.pn_message_error(pnMsg)))) + } + return goMessage(pnMsg), nil +} + +// Encode the message into bufffer. +// If buffer is nil or len(buffer) is not sufficient to encode the message a larger +// buffer will be returned. +func (m *message) Encode(buffer []byte) ([]byte, error) { + pn := m.pnMessage() + defer C.pn_message_free(pn) + encode := func(buf []byte) ([]byte, error) { + len := cLen(buf) + result := C.pn_message_encode(pn, cPtr(buf), &len) + switch { + case result == C.PN_OVERFLOW: + return buf, overflow + case result < 0: + return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result)) + default: + return buf[:len], nil + } + } + return encodeGrow(buffer, encode) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/message_test.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/message_test.go b/proton-c/bindings/go/amqp/message_test.go new file mode 100644 index 0000000..46e26de --- /dev/null +++ b/proton-c/bindings/go/amqp/message_test.go @@ -0,0 +1,90 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +import ( + "reflect" + "testing" + "time" +) + +func roundTrip(t *testing.T, m Message) { + buffer, err := m.Encode(nil) + if err != nil { + t.Fatalf("Encode failed: %v", err) + } + m2, err := DecodeMessage(buffer) + if err != nil { + t.Fatalf("Decode failed: %v", err) + } + if !reflect.DeepEqual(m, m2) { + t.Errorf("Message mismatch got\n%#v\nwant\n%#v", m, m2) + } +} + +func TestDefaultMessageRoundTrip(t *testing.T) { + m := NewMessage() + // Check defaults + assertEqual(m.Inferred(), false) + assertEqual(m.Durable(), false) + assertEqual(m.Priority(), uint8(4)) + assertEqual(m.TTL(), time.Duration(0)) + assertEqual(m.UserId(), "") + assertEqual(m.Address(), "") + assertEqual(m.Subject(), "") + assertEqual(m.ReplyTo(), "") + assertEqual(m.ContentType(), "") + assertEqual(m.ContentEncoding(), "") + assertEqual(m.GroupId(), "") + assertEqual(m.GroupSequence(), int32(0)) + assertEqual(m.ReplyToGroupId(), "") + assertEqual(m.MessageId(), nil) + assertEqual(m.CorrelationId(), nil) + assertEqual(*m.Instructions(), map[string]interface{}{}) + assertEqual(*m.Annotations(), map[string]interface{}{}) + assertEqual(*m.Properties(), map[string]interface{}{}) + assertEqual(m.Body(), nil) + + roundTrip(t, m) +} + +func TestMessageRoundTrip(t *testing.T) { + m := NewMessage() + m.SetInferred(false) + m.SetDurable(true) + m.SetPriority(42) + m.SetTTL(0) + m.SetUserId("user") + m.SetAddress("address") + m.SetSubject("subject") + m.SetReplyTo("replyto") + m.SetContentType("content") + m.SetContentEncoding("encoding") + m.SetGroupId("group") + m.SetGroupSequence(42) + m.SetReplyToGroupId("replytogroup") + m.SetMessageId("id") + m.SetCorrelationId("correlation") + *m.Instructions() = map[string]interface{}{"instructions": "foo"} + *m.Annotations() = map[string]interface{}{"annotations": "foo"} + *m.Properties() = map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"} + m.SetBody("hello") + roundTrip(t, m) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/types.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/types.go b/proton-c/bindings/go/amqp/types.go new file mode 100644 index 0000000..8713520 --- /dev/null +++ b/proton-c/bindings/go/amqp/types.go @@ -0,0 +1,193 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package amqp + +// #include <proton/codec.h> +// const pn_type_t PN_DATA_TYPE_ERROR = (pn_type_t) -1; +import "C" + +import ( + "bytes" + "fmt" + "reflect" + "time" + "unsafe" +) + +func pnTypeString(pt C.pn_type_t) string { + switch pt { + case C.PN_NULL: + return "null" + case C.PN_BOOL: + return "bool" + case C.PN_UBYTE: + return "ubyte" + case C.PN_BYTE: + return "byte" + case C.PN_USHORT: + return "ushort" + case C.PN_SHORT: + return "short" + case C.PN_CHAR: + return "char" + case C.PN_UINT: + return "uint" + case C.PN_INT: + return "int" + case C.PN_ULONG: + return "ulong" + case C.PN_LONG: + return "long" + case C.PN_TIMESTAMP: + return "timestamp" + case C.PN_FLOAT: + return "float" + case C.PN_DOUBLE: + return "double" + case C.PN_DECIMAL32: + return "decimal32" + case C.PN_DECIMAL64: + return "decimal64" + case C.PN_DECIMAL128: + return "decimal128" + case C.PN_UUID: + return "uuid" + case C.PN_BINARY: + return "binary" + case C.PN_STRING: + return "string" + case C.PN_SYMBOL: + return "symbol" + case C.PN_DESCRIBED: + return "described" + case C.PN_ARRAY: + return "array" + case C.PN_LIST: + return "list" + case C.PN_MAP: + return "map" + case C.PN_DATA_TYPE_ERROR: + return "no-data" + default: + return fmt.Sprintf("unknown-type(%d)", pt) + } +} + +// Go types +var ( + bytesType = reflect.TypeOf([]byte{}) + valueType = reflect.TypeOf(reflect.Value{}) +) + +// FIXME aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys. + +// Map is a generic map that can have mixed key and value types and so can represent any AMQP map +type Map map[interface{}]interface{} + +// List is a generic list that can hold mixed values and can represent any AMQP list. +// +type List []interface{} + +// Symbol is a string that is encoded as an AMQP symbol +type Symbol string + +// Binary is a string that is encoded as an AMQP binary. +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte +type Binary string + +// GoString for Map prints values with their types, useful for debugging. +func (m Map) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", m) + i := len(m) + for k, v := range m { + fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v) + i-- + if i > 0 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// GoString for List prints values with their types, useful for debugging. +func (l List) GoString() string { + out := &bytes.Buffer{} + fmt.Fprintf(out, "%T{", l) + for i := 0; i < len(l); i++ { + fmt.Fprintf(out, "%T(%#v)", l[i], l[i]) + if i == len(l)-1 { + fmt.Fprint(out, ", ") + } + } + fmt.Fprint(out, "}") + return out.String() +} + +// pnTime converts Go time.Time to Proton millisecond Unix time. +func pnTime(t time.Time) C.pn_timestamp_t { + secs := t.Unix() + // Note: sub-second accuracy is not guaraunteed if the Unix time in + // nanoseconds cannot be represented by an int64 (sometime around year 2260) + msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond) + return C.pn_timestamp_t(secs*1000 + msecs) +} + +// goTime converts a pn_timestamp_t to a Go time.Time. +func goTime(t C.pn_timestamp_t) time.Time { + secs := int64(t) / 1000 + nsecs := (int64(t) % 1000) * int64(time.Millisecond) + return time.Unix(secs, nsecs) +} + +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) { + if cBytes.start != nil { + bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size)) + } + return +} + +func goString(cBytes C.pn_bytes_t) (str string) { + if cBytes.start != nil { + str = C.GoStringN(cBytes.start, C.int(cBytes.size)) + } + return +} + +func pnBytes(b []byte) C.pn_bytes_t { + if len(b) == 0 { + return C.pn_bytes_t{0, nil} + } else { + return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))} + } +} + +func cPtr(b []byte) *C.char { + if len(b) == 0 { + return nil + } + return (*C.char)(unsafe.Pointer(&b[0])) +} + +func cLen(b []byte) C.size_t { + return C.size_t(len(b)) +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6ea3649d/proton-c/bindings/go/amqp/uid.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/amqp/uid.go b/proton-c/bindings/go/amqp/uid.go new file mode 100644 index 0000000..944bf6f --- /dev/null +++ b/proton-c/bindings/go/amqp/uid.go @@ -0,0 +1,40 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Generating unique IDs for various things. + +package amqp + +import ( + "strconv" + "sync/atomic" +) + +// A simple atomic counter to generate unique 64 bit IDs. +type UidCounter struct{ count uint64 } + +// NextInt gets the next uint64 value from the atomic counter. +func (uc *UidCounter) NextInt() uint64 { + return atomic.AddUint64(&uc.count, 1) +} + +// Next gets the next integer value encoded as a base32 string, safe for NUL terminated C strings. +func (uc *UidCounter) Next() string { + return strconv.FormatUint(uc.NextInt(), 32) +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
