PROTON-827: go binding: partial implementation of concurrent messaging API with examples.
Please see proton-c/bindings/go/README.md for details of this update. Implemented a good chunk of the concurrent Go messaging API with send.go and receive.go examples. The examples work with event/broker.go (which uses the event API) and with the python broker.py, simple_send.py and simple_recv.py examples. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0c11d11c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0c11d11c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0c11d11c Branch: refs/heads/master Commit: 0c11d11cdb5f1185219b907d1ad9ba22451512cc Parents: 8d1d20e Author: Alan Conway <[email protected]> Authored: Mon Apr 27 16:09:26 2015 -0400 Committer: Alan Conway <[email protected]> Committed: Tue May 5 19:23:21 2015 -0400 ---------------------------------------------------------------------- .gitignore | 3 + examples/go/event/broker.go | 188 +++--- examples/go/example.go | 54 -- examples/go/receive.go | 99 +++- examples/go/send.go | 115 +++- proton-c/bindings/go/README.md | 106 ++-- proton-c/bindings/go/src/Makefile | 17 +- proton-c/bindings/go/src/genwrap.go | 43 +- .../go/src/qpid.apache.org/proton/doc.go | 18 +- .../go/src/qpid.apache.org/proton/dummy.go | 82 --- .../go/src/qpid.apache.org/proton/error.go | 111 ---- .../go/src/qpid.apache.org/proton/event/doc.go | 12 +- .../src/qpid.apache.org/proton/event/error.go | 77 --- .../qpid.apache.org/proton/event/handlers.go | 145 +++-- .../src/qpid.apache.org/proton/event/message.go | 75 +++ .../go/src/qpid.apache.org/proton/event/pump.go | 347 +++++++---- .../qpid.apache.org/proton/event/wrappers.go | 115 +++- .../proton/event/wrappers_gen.go | 584 +++++++++++++------ .../qpid.apache.org/proton/internal/error.go | 125 ++++ .../src/qpid.apache.org/proton/interop_test.go | 8 +- .../go/src/qpid.apache.org/proton/marshal.go | 15 +- .../go/src/qpid.apache.org/proton/message.go | 66 +-- .../src/qpid.apache.org/proton/messaging/doc.go | 28 + .../proton/messaging/example_test.go | 268 +++++++++ .../qpid.apache.org/proton/messaging/handler.go | 70 +++ .../proton/messaging/messaging.go | 250 ++++++++ .../go/src/qpid.apache.org/proton/uid.go | 40 ++ .../go/src/qpid.apache.org/proton/unmarshal.go | 30 +- .../go/src/qpid.apache.org/proton/url.go | 5 +- proton-c/bindings/python/proton/handlers.py | 2 +- 30 files changed, 2158 insertions(+), 940 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 2af3e68..2b509c6 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ eclipse-classes # The usual location for proton-c build files proton-c/build + +# Executables built by go binding tests +proton-c/bindings/go/bin http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/event/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/event/broker.go b/examples/go/event/broker.go index 086043f..8b227bc 100644 --- a/examples/go/event/broker.go +++ b/examples/go/event/broker.go @@ -36,15 +36,29 @@ import ( "log" "net" "os" + "path" "qpid.apache.org/proton" "qpid.apache.org/proton/event" + "sync" ) -// panicIf is simplistic error handling for example code, not recommended practice. -func panicIf(err error) { - if err != nil { - panic(err) +// Command-line flags +var addr = flag.String("addr", ":amqp", "Listening address") +var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") +var full = flag.Bool("full", false, "Print full message not just body.") + +func main() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, ` +Usage: %s +A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. +`, os.Args[0]) + flag.PrintDefaults() } + flag.Parse() + b := newBroker() + err := b.listen(*addr) + fatalIf(err) } // queue is a structure representing a queue. @@ -54,18 +68,20 @@ type queue struct { consumers map[event.Link]bool // Set of consumer links } -func newQueue(name string) *queue { - debug.Printf("Create queue %s\n", name) - return &queue{name, list.New(), make(map[event.Link]bool)} +type logLink event.Link // Wrapper to print links in format for logging + +func (ll logLink) String() string { + l := event.Link(ll) + return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump()) } func (q *queue) subscribe(link event.Link) { - debug.Printf("Subscribe to %s\n", q.name) + debug.Printf("link %s subscribed to queue %s", logLink(link), q.name) q.consumers[link] = true } func (q *queue) unsubscribe(link event.Link) { - debug.Printf("Unsubscribe from %s\n", q.name) + debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name) delete(q.consumers, link) } @@ -73,30 +89,37 @@ func (q *queue) empty() bool { return len(q.consumers) == 0 && q.messages.Len() == 0 } -func (q *queue) publish(message proton.Message) { - debug.Printf("Push to %s: %#v\n", q.name, message) +func (q *queue) push(context *event.Pump, message proton.Message) { q.messages.PushBack(message) - q.dispatch() + q.pop(context) } -func (q *queue) dispatchTo(link event.Link) bool { +func (q *queue) popTo(context *event.Pump, link event.Link) bool { if q.messages.Len() != 0 && link.Credit() > 0 { - message := q.messages.Front().Value.(proton.Message) - debug.Printf("Pop from %s: %#v\n", q.name, message) + message := q.messages.Remove(q.messages.Front()).(proton.Message) + debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message}) // The first return parameter is an event.Delivery. // The Deliver can be used to track message status, e.g. so we can re-delver on failure. // This demo broker doesn't do that. - _, err := message.Send(link) - panicIf(err) - q.messages.Remove(q.messages.Front()) + linkPump := link.Session().Connection().Pump() + if context == linkPump { + if context == nil { + log.Fatal("pop in nil context") + } + link.Send(message) // link is in the current pump, safe to call Send() direct + } else { + linkPump.Inject <- func() { // Inject to link's pump + link.Send(message) // FIXME aconway 2015-05-04: error handlig + } + } return true } return false } -func (q *queue) dispatch() (dispatched bool) { +func (q *queue) pop(context *event.Pump) (popped bool) { for c, _ := range q.consumers { - dispatched = dispatched || q.dispatchTo(c) + popped = popped || q.popTo(context, c) } return } @@ -104,54 +127,79 @@ func (q *queue) dispatch() (dispatched bool) { // broker implements event.MessagingHandler and reacts to events by moving messages on or off queues. type broker struct { queues map[string]*queue - pumps map[*event.Pump]struct{} // Set of running event pumps (i.e. connections) + lock sync.Mutex // FIXME aconway 2015-05-04: un-golike, better broker coming... } func newBroker() *broker { - return &broker{queues: make(map[string]*queue), pumps: make(map[*event.Pump]struct{})} + return &broker{queues: make(map[string]*queue)} } func (b *broker) getQueue(name string) *queue { q := b.queues[name] if q == nil { - q = newQueue(name) + debug.Printf("Create queue %s", name) + q = &queue{name, list.New(), make(map[event.Link]bool)} b.queues[name] = q } return q } +func (b *broker) unsubscribe(l event.Link) { + if l.IsSender() { + q := b.queues[l.RemoteSource().Address()] + if q != nil { + q.unsubscribe(l) + if q.empty() { + debug.Printf("Delete queue %s", q.name) + delete(b.queues, q.name) + } + } + } +} + func (b *broker) Handle(t event.MessagingEventType, e event.Event) error { + // FIXME aconway 2015-05-04: locking is un-golike, better example coming soon. + // Needed because Handle is called for multiple connections concurrently + // and the queue data structures are not thread safe. + b.lock.Lock() + defer b.lock.Unlock() + switch t { case event.MLinkOpening: if e.Link().IsSender() { - // FIXME aconway 2015-03-23: handle dynamic consumers - b.getQueue(e.Link().RemoteSource().Address()).subscribe(e.Link()) + q := b.getQueue(e.Link().RemoteSource().Address()) + q.subscribe(e.Link()) } case event.MLinkClosing: - if e.Link().IsSender() { - q := b.getQueue(e.Link().RemoteSource().Address()) - q.unsubscribe(e.Link()) - if q.empty() { - delete(b.queues, q.name) - } + b.unsubscribe(e.Link()) + + case event.MDisconnected: + fallthrough + case event.MConnectionClosing: + c := e.Connection() + for l := c.LinkHead(event.SRemoteActive); !l.IsNil(); l = l.Next(event.SRemoteActive) { + b.unsubscribe(l) } case event.MSendable: - b.getQueue(e.Link().RemoteSource().Address()).dispatchTo(e.Link()) + q := b.getQueue(e.Link().RemoteSource().Address()) + q.popTo(e.Connection().Pump(), e.Link()) case event.MMessage: - m, err := proton.EventMessage(e) - panicIf(err) - b.getQueue(e.Link().RemoteTarget().Address()).publish(m) + m, err := event.DecodeMessage(e) + fatalIf(err) + qname := e.Link().RemoteTarget().Address() + debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m}) + b.getQueue(qname).push(e.Connection().Pump(), m) } return nil } func (b *broker) listen(addr string) (err error) { // Use the standard Go "net" package to listen for connections. - info.Printf("Listening on %s\n", addr) + info.Printf("Listening on %s", addr) listener, err := net.Listen("tcp", addr) if err != nil { return err @@ -160,46 +208,56 @@ func (b *broker) listen(addr string) (err error) { for { conn, err := listener.Accept() if err != nil { - info.Printf("Accept error: %s\n", err) + info.Printf("Accept error: %s", err) continue } - info.Printf("Accepted connection %s<-%s\n", conn.LocalAddr(), conn.RemoteAddr()) pump, err := event.NewPump(conn, event.NewMessagingDelegator(b)) - panicIf(err) + fatalIf(err) + info.Printf("Accepted %s[%p]", pump, pump) pump.Server() - b.pumps[pump] = struct{}{} - go pump.Run() + go func() { + pump.Run() + if pump.Error == nil { + info.Printf("Closed %s", pump) + } else { + info.Printf("Closed %s: %s", pump, pump.Error) + } + }() } } -var addr = flag.String("addr", ":amqp", "Listening address") -var quiet = flag.Bool("quiet", false, "Don't print informational messages") -var debugFlag = flag.Bool("debug", false, "Print debugging messages") -var info, debug *log.Logger - -func output(enable bool) io.Writer { - if enable { - return os.Stdout - } else { - return ioutil.Discard +// Logging +func logger(prefix string, level int, w io.Writer) *log.Logger { + if *verbose >= level { + return log.New(w, prefix, 0) } + return log.New(ioutil.Discard, "", 0) } -func main() { - flag.Usage = func() { - fmt.Fprintf(os.Stderr, ` -Usage: %s [queue ...] -A simple broker. Queues are created automatically for sender or receiver addrsses. -`, os.Args[0]) - flag.PrintDefaults() - } +var info, debug *log.Logger + +func init() { flag.Parse() - debug = log.New(output(*debugFlag), "debug: ", log.Ltime) - info = log.New(output(!*quiet), "info: ", log.Ltime) - b := newBroker() - err := b.listen(*addr) + name := path.Base(os.Args[0]) + log.SetFlags(0) + log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. + info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. + debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +} + +// Simple error handling for demo. +func fatalIf(err error) { if err != nil { - fmt.Println(err) - os.Exit(1) + log.Fatal(err) + } +} + +type formatMessage struct{ m proton.Message } + +func (fm formatMessage) String() string { + if *full { + return fmt.Sprintf("%#v", fm.m) + } else { + return fmt.Sprintf("%#v", fm.m.Body()) } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/example.go ---------------------------------------------------------------------- diff --git a/examples/go/example.go b/examples/go/example.go deleted file mode 100644 index 08ecfeb..0000000 --- a/examples/go/example.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package main - -import ( - "fmt" - "qpid.apache.org/proton" - "sync" -) - -func receive(c proton.Connection, addr string, wait *sync.WaitGroup) { - defer wait.Done() - r := c.Receiver(addr) - defer r.Close() - for m := range r.Receive { // r.Receive is a chan Message - fmt.Println("received: ", addr, m.Body(), m.Subject()) - if m.Subject() == "stop" { - return - } - } -} - -func main() { - var c1, c2 proton.Connection - c1.Open("amqp://foo:amqp") - defer c1.Close() - c2.Open("amqp://localhost:4567") - defer c2.Close() - - var wait sync.WaitGroup - wait.Add(2) - - go receive(c1, "foo", &wait) - go receive(c2, "bar", &wait) - - wait.Wait() -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/receive.go b/examples/go/receive.go index 231e0ce..fc1c85a 100644 --- a/examples/go/receive.go +++ b/examples/go/receive.go @@ -22,25 +22,24 @@ package main import ( "flag" "fmt" + "io" + "io/ioutil" + "log" "math" "net" "os" + "path" "qpid.apache.org/proton" + "qpid.apache.org/proton/messaging" "sync" "time" ) -// Simplistic error handling for demo. Not recommended. -func panicIf(err error) { - if err != nil { - panic(err) - } -} - // Command-line flags +var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 means unlimited.") var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means unlimited.") -var short = flag.Bool("short", false, "Short format of message: body only") +var full = flag.Bool("full", false, "Print full message not just body.") func main() { // Parse flags and arguments, print usage message on error. @@ -76,29 +75,44 @@ Receive messages from all the listed URLs concurrently and print them. wait.Add(len(urls)) // Wait for one goroutine per URL. - for _, urlStr := range urls { - // Start a goroutine to receive from urlStr + // Arrange to close all connections on exit + connections := make([]*messaging.Connection, len(urls)) + defer func() { + for _, c := range connections { + if c != nil { + c.Close() + } + } + }() + + for i, urlStr := range urls { + debug.Printf("Connecting to %s", urlStr) go func(urlStr string) { defer wait.Done() // Notify main() that this goroutine is done. url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - panicIf(err) + fatalIf(err) // Open a standard Go net.Conn for the AMQP connection conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - panicIf(err) - defer conn.Close() // Close conn on goroutine exit. + fatalIf(err) - pc, err := proton.Connect(conn) // This is our AMQP connection. - panicIf(err) - // We could 'defer pc.Close()' but conn.close() will automatically close the proton connection. + pc, err := messaging.Connect(conn) // This is our AMQP connection. + fatalIf(err) + connections[i] = pc // For convenience a proton.Connection provides a DefaultSession() // pc.Receiver() is equivalent to pc.DefaultSession().Receiver() r, err := pc.Receiver(url.Path) - panicIf(err) + fatalIf(err) - for m := range r.Receive { // r.Receive is a channel to receive messages. - select { + for { + var m proton.Message + select { // Receive a message or stop. + case m = <-r.Receive: + case <-stop: // The program is stopping. + return + } + select { // Send m to main() or stop case messages <- m: // Send m to main() case <-stop: // The program is stopping. return @@ -106,24 +120,57 @@ Receive messages from all the listed URLs concurrently and print them. } }(urlStr) } + info.Printf("Listening") // time.After() returns a channel that will close when the timeout is up. timer := time.After(duration) // main() prints each message and checks for count or timeout being exceeded. - for i := *count; i > 0; i-- { + for i := int64(0); i < *count; i++ { select { case m := <-messages: - if *short { - fmt.Println(m.Body()) - } else { - fmt.Printf("%#v\n\n", m) - } + debug.Print(formatMessage{m}) case <-timer: // Timeout has expired i = 0 } } - + info.Printf("Received %d messages", *count) close(stop) // Signal all goroutines to stop. wait.Wait() // Wait for all goroutines to finish. } + +// Logging +func logger(prefix string, level int, w io.Writer) *log.Logger { + if *verbose >= level { + return log.New(w, prefix, 0) + } + return log.New(ioutil.Discard, "", 0) +} + +var info, debug *log.Logger + +func init() { + flag.Parse() + name := path.Base(os.Args[0]) + log.SetFlags(0) // Use default logger for errors. + log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. + info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. + debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +} + +// Simple error handling for demo. +func fatalIf(err error) { + if err != nil { + log.Fatal(err) + } +} + +type formatMessage struct{ m proton.Message } + +func (fm formatMessage) String() string { + if *full { + return fmt.Sprintf("%#v", fm.m) + } else { + return fmt.Sprintf("%#v", fm.m.Body()) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/examples/go/send.go ---------------------------------------------------------------------- diff --git a/examples/go/send.go b/examples/go/send.go index 4a7f947..3c18466 100644 --- a/examples/go/send.go +++ b/examples/go/send.go @@ -22,23 +22,28 @@ package main import ( "flag" "fmt" + "io" + "io/ioutil" + "log" "math" "net" "os" + "path" "qpid.apache.org/proton" + "qpid.apache.org/proton/messaging" "sync" ) -// Simplistic error handling for demo. Not recommended. -func panicIf(err error) { - if err != nil { - panic(err) - } -} - // Command-line flags +var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") var count = flag.Int64("count", 0, "Send this may messages per address. 0 means unlimited.") +// Ack associates an info string with an acknowledgement +type Ack struct { + ack messaging.Acknowledgement + info string +} + func main() { // Parse flags and arguments, print usage message on error. flag.Usage = func() { @@ -53,45 +58,101 @@ To each URL, send the string "path-n" where n is the message number. urls := flag.Args() // Non-flag arguments are URLs to receive from if len(urls) == 0 { flag.Usage() - fmt.Fprintf(os.Stderr, "No URL provided") + fmt.Fprintf(os.Stderr, "No URL provided\n") os.Exit(1) } if *count == 0 { *count = math.MaxInt64 } + // Create a channel to receive all the acknowledgements + acks := make(chan Ack) + // Create a goroutine for each URL that sends messages. var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. wait.Add(len(urls)) // Wait for one goroutine per URL. - for _, urlStr := range urls { - // Start a goroutine to receive from urlStr - go func(urlStr string) { - defer wait.Done() // Notify main() that this goroutine is done. - url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - panicIf(err) + // Arrange to close all connections on exit + connections := make([]*messaging.Connection, len(urls)) + defer func() { + for _, c := range connections { + c.Close() + } + }() + + for i, urlStr := range urls { + url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. + fatalIf(err) + debug.Printf("Connecting to %v", url) - // Open a standard Go net.Conn for the AMQP connection - conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - panicIf(err) - defer conn.Close() // Close conn on goroutine exit. + // Open a standard Go net.Conn for the AMQP connection + conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" + fatalIf(err) - pc, err := proton.Connect(conn) // This is our AMQP connection. - panicIf(err) - // We could 'defer pc.Close()' but conn.close() will automatically close the proton connection. + pc, err := messaging.Connect(conn) // This is our AMQP connection using conn. + fatalIf(err) + connections[i] = pc - // For convenience a proton.Connection provides a DefaultSession() - // pc.Sender() is equivalent to pc.DefaultSession().Sender() + // Start a goroutine to send to urlStr + go func(urlStr string) { + defer wait.Done() // Notify main() that this goroutine is done. + + // FIXME aconway 2015-04-29: sessions, default sessions, senders... + // Create a sender using the path of the URL as the AMQP target address s, err := pc.Sender(url.Path) - panicIf(err) + fatalIf(err) for i := int64(0); i < *count; i++ { m := proton.NewMessage() - m.SetBody(fmt.Sprintf("%v-%v", url.Path, i)) - err := s.Send(m) - panicIf(err) + body := fmt.Sprintf("%v-%v", url.Path, i) + m.SetBody(body) + ack, err := s.Send(m) + fatalIf(err) + acks <- Ack{ack, body} } }(urlStr) } + + // Wait for all the acknowledgements + expect := int(*count) * len(urls) + debug.Printf("Started senders, expect %v acknowledgements", expect) + for i := 0; i < expect; i++ { + ack, ok := <-acks + if !ok { + info.Fatalf("acks channel closed after only %d acks\n", i) + } + d := <-ack.ack + debug.Printf("acknowledgement[%v] %v", i, ack.info) + if d != messaging.Accepted { + info.Printf("Unexpected disposition %v", d) + } + } + info.Printf("Received all %v acknowledgements", expect) wait.Wait() // Wait for all goroutines to finish. } + +// Logging +func logger(prefix string, level int, w io.Writer) *log.Logger { + if *verbose >= level { + return log.New(w, prefix, 0) + } + return log.New(ioutil.Discard, "", 0) +} + +var info, debug *log.Logger + +func init() { + flag.Parse() + name := path.Base(os.Args[0]) + log.SetFlags(0) // Use default logger for errors. + log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. + info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. + debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +} + +// Simple error handling for demo. +func fatalIf(err error) { + if err != nil { + log.Fatal(err) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/README.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md index fc04d79..597481c 100644 --- a/proton-c/bindings/go/README.md +++ b/proton-c/bindings/go/README.md @@ -30,55 +30,33 @@ There are two types of developer we want to support ## Status -There are two Go modules so far. See the documentation using +Package proton encodes and decodes AMQP messages and data as Go types. - godoc apache.org/proton - godoc apache.org/proton/event - -The proton module maps between AMQP and Go types and has a Go representation of -an AMQP message. It is the beginning of the "real" Go API. For examples of what -this API will look like see: - -- [receive.go](../../../examples/go/receive.go) uses channels and goroutines to receive concurrently. -- [send.go](../../../examples/go/send.go) less interesting but there for symmetry. +Sub-packages 'event' and 'messaging' provide two alternative ways to write +AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' +gives complete low-level control of the underlying proton C engine. -The event module is a port of the proton C and python MessagingHandler APIs. It -provides low-level, goroutine-unsafe but (mostly) complete access to proton. It -is the foundation for building the Go API and may be useful for advanced AMQP -projects or cross-langauge proton development in future. +The event package is fairly complete, with the exception of the proton +reactor. It's unclear if the reactor is important for go. -The event API is functional but not completely complete. The Go API doesn't -exist yet, there is some dummy code so the examples will compile and run. - -## The event driven API +The messaging package is just starting. The examples work but anything else might not. -The event module contains +### Examples -- Go Proton events (AMQP events only, no reactor events yet) -- Go MessagingHandler events (equivalent to python MessagingHandler.) -- Pump to feed data between a Go net.Conn connection and a proton event loop. +messaging API: -The Pump uses 3 goroutines per connection, one to read, one to write and one to -run the proton event loop. Proton's thread-unsafe data is never used outside the -event loop goroutine. +- [receive.go](../../../examples/go/receive.go) receive from many connections concurrently +- [send.go](../../../examples/go/send.go) send to many connections concurrently -This API provides direct access to proton events, equivalent to C or python -event API. It does not yet support reactor events or allow multiple connections -to be handled in a single event loop goroutine, these are temporary limitations. +event API: +- [broker.go](../../../examples/go/event/broker.go) simple mini-broker -There is one example: examples/go/broker.go. It is a port of -examples/python/broker.py and can be used with the python `simple_send.py` and -`simple_recv.py` clients. +The examples work with each other and with the python `broker.py`, +`simple_send.py` and `simple_receive.py`. -The broker example works for simple tests but is concurrency-unsafe. It uses a -single `broker`, implementing MessagingHandler, with multiple pumps. The proton -event loops are safe in their separate goroutines but the `broker` state (queues -etc.) is not. We can fix this by funneling multiple connections into a single -event loop as mentioned above. +## The event driven API -However this API is not the end of the story. It will be the foundation to build -a more Go-like API that allows *any* goroutine to send or receive messages -without having to know anything about event loops or pumps. +See the package documentation for details. ## The Go API @@ -87,15 +65,49 @@ AMQP messages and other information (acknowledgments, flow control instructions etc.) using channels. There will be no user-visible locks and no need to run user code in special goroutines, e.g. as handlers in a proton event loop. -There is a (trivial, speculative, incomplete) example in examples/go/example.go -of what part of it might look like. It shows receiving messages concurrently -from two different connections in a single goroutine (it omits sessions). +See the package documentation for emerging details. + +Currently using a channel to receive messages, a function to send them (channels +internally) and a channel as a "future" for acknowledgements to senders. This +may change. + +## Design Questions + + +1. Error reporting and handling, esp. async. errors: + +What are common patterns for handling errors across channels? I.e. the thing at +one end of the channel blows up, how do you tell the other end? + +readers: you can close the channel, but there's no error info. You could pass a +struct { data, error } or use a second channel. Pros & cons? + +writers: you can't close without a panic so you need a second channel. Is this +a normal pattern: + + select { + data -> sendChan: sentit() + err := <- errChan: oops(err) + } + +2. Use of channels: + +I recently saw an interesting Go tip: "Make your API synchronous because in Go +it is simple to make a sync call async by putting it in a goroutine." + +What are the tradeoffs of exposing channels directly in the API vs. hiding them +behind methods? Exposing lets users select directly, less overhead than starting +a goroutine, creating MORE channels and selecting those. Hiding lets us wrap +patterns like the 'select {data, err}' pattern above, which is easier and less +error prone than asking users to do it themselves. + +The standard net.Conn uses blocking methods, not channels. I did as the tip says +and wrapped them in goroutines and channels. The library does expose *read* +channels e.g. time.After. Are there any *write* channels in the standard +library? I note that time.After has no errors, and suspect that may be a key +factor in the descison. -There is a tempting analogy between Go channels and AMQP links, but Go channels -are much simpler beasts than AMQP links. It is likely a link will be implemented -by a Go type that uses more than one channel. E.g. there will probably be -separate channels for messages and acknowledgments, perhaps also for flow -control status. +3. The "future" pattern for acknowledgements: super easy in Go but depends on 1. and 2. above. ## Why a separate API for Go? http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/Makefile ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/Makefile b/proton-c/bindings/go/src/Makefile index bbcaded..98baa4c 100644 --- a/proton-c/bindings/go/src/Makefile +++ b/proton-c/bindings/go/src/Makefile @@ -6,20 +6,11 @@ GENERATED=qpid.apache.org/proton/event/wrappers_gen.go test: $(GENERATED) - go test $(GOFLAGS) qpid.apache.org/proton - go test $(GOFLAGS) qpid.apache.org/proton/event + go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton + go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/event + go test -v $(TESTFLAGS) $(GOFLAGS) qpid.apache.org/proton/messaging -rebuild: $(GENERATED) - go build -a $(GOFLAGS) qpid.apache.org/proton - go build -a $(GOFLAGS) qpid.apache.org/proton/event - go test $(GOFLAGS) -c -a qpid.apache.org/proton - go test $(GOFLAGS) -c -a qpid.apache.org/proton/event - -qpid.apache.org/proton/event/wrappers_gen.go: genwrap.go ../../../include/proton/*.h +$(GENERATED): genwrap.go ../../../include/proton/*.h go run genwrap.go -broker: test - go build $(GOFLAGS) ~/proton/examples/go/event/broker.go - ./broker - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/genwrap.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/genwrap.go b/proton-c/bindings/go/src/genwrap.go index 83eb34c..e269367 100644 --- a/proton-c/bindings/go/src/genwrap.go +++ b/proton-c/bindings/go/src/genwrap.go @@ -44,7 +44,11 @@ func mixedCase(s string) string { return result } -var templateFuncs = template.FuncMap{"mixedCase": mixedCase} +func mixedCaseTrim(s, prefix string) string { + return mixedCase(strings.TrimPrefix(s, prefix)) +} + +var templateFuncs = template.FuncMap{"mixedCase": mixedCase, "mixedCaseTrim": mixedCaseTrim} func doTemplate(out io.Writer, data interface{}, tmpl string) { panicIf(template.Must(template.New("").Funcs(templateFuncs).Parse(tmpl)).Execute(out, data)) @@ -73,7 +77,7 @@ const ({{range $values}} func (e {{mixedCase $enumName}}) String() string { switch e { {{range $values}} - case C.{{.}}: return "{{mixedCase .}}" {{end}} + case C.{{.}}: return "{{mixedCaseTrim . "PN_"}}" {{end}} } return "unknown" } @@ -131,7 +135,7 @@ type eventType struct { func newEventType(cName string) eventType { var etype eventType etype.Cname = cName - etype.Name = mixedCase(strings.TrimPrefix(cName, "PN_")) + etype.Name = mixedCaseTrim(cName, "PN_") etype.Fname = "On" + etype.Name etype.Iname = etype.Fname + "Interface" return etype @@ -141,7 +145,7 @@ var ( enumDefRe = regexp.MustCompile("typedef enum {([^}]*)} pn_([a-z_]+)_t;") enumValRe = regexp.MustCompile("PN_[A-Z_]+") skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER") - skipFnRe = regexp.MustCompile("attach|context|class|collect|^recv$|^send$") + skipFnRe = regexp.MustCompile("attach|context|class|collect|^recv$|^send$|transport") ) // Generate event wrappers. @@ -183,6 +187,7 @@ type genType struct { Ctype, Gotype string ToGo func(value string) string ToC func(value string) string + Assign func(value string) string } func (g genType) printBody(out io.Writer, value string) { @@ -237,19 +242,24 @@ func mapType(ctype string) (g genType) { case "C.uint64_t": g.Gotype = "uint64" case "C.uint32_t": + g.Gotype = "uint16" + case "C.uint16_t": g.Gotype = "uint32" case "C.const char *": - g.Gotype = "string" - g.Ctype = "C.CString" + fallthrough case "C.char *": g.Gotype = "string" g.Ctype = "C.CString" + g.ToC = func(v string) string { return fmt.Sprintf("%sC", v) } + g.Assign = func(v string) string { + return fmt.Sprintf("%sC := C.CString(%s)\n defer C.free(unsafe.Pointer(%sC))\n", v, v, v) + } case "C.pn_seconds_t": g.Gotype = "time.Duration" g.ToGo = func(v string) string { return fmt.Sprintf("(time.Duration(%s) * time.Second)", v) } case "C.pn_error_t *": g.Gotype = "error" - g.ToGo = func(v string) string { return fmt.Sprintf("pnError(%s)", v) } + g.ToGo = func(v string) string { return fmt.Sprintf("internal.PnError(unsafe.Pointer(%s))", v) } default: pnId := regexp.MustCompile(" *pn_([a-z_]+)_t *\\*? *") match := pnId.FindStringSubmatch(g.Ctype) @@ -285,6 +295,7 @@ func splitArgs(argstr string) []genArg { } args := make([]genArg, 0) for _, item := range strings.Split(argstr, ",") { + item = strings.Trim(item, " \n") typeName := typeNameRe.FindStringSubmatch(item) if typeName == nil { panic(fmt.Errorf("Can't split argument type/name %#v", item)) @@ -318,6 +329,16 @@ func cArgs(args []genArg) string { return l } +func cAssigns(args []genArg) string { + l := "\n" + for _, arg := range args { + if arg.Assign != nil { + l += fmt.Sprintf("%s\n", arg.Assign(arg.Name)) + } + } + return l +} + // Return the go name of the function or "" to skip the function. func goFnName(api, fname string) string { // Skip class, context and attachment functions. @@ -328,13 +349,13 @@ func goFnName(api, fname string) string { case "link.get_drain": return "IsDrain" default: - return mixedCase(strings.TrimPrefix(fname, "get_")) + return mixedCaseTrim(fname, "get_") } } func apiWrapFns(api, header string, out io.Writer) { fmt.Fprintf(out, "type %s struct{pn *C.pn_%s_t}\n", mixedCase(api), api) - fmt.Fprintf(out, "func (%c %s) isNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0]) + fmt.Fprintf(out, "func (%c %s) IsNil() bool { return %c.pn == nil }\n", api[0], mixedCase(api), api[0]) fn := regexp.MustCompile(fmt.Sprintf(`PN_EXTERN ([a-z0-9_ ]+ *\*?) *pn_%s_([a-z_]+)\(pn_%s_t *\*[a-z_]+ *,? *([^)]*)\)`, api, api)) for _, m := range fn.FindAllStringSubmatch(header, -1) { rtype, fname, argstr := mapType(m[1]), m[2], m[3] @@ -345,6 +366,7 @@ func apiWrapFns(api, header string, out io.Writer) { args := splitArgs(argstr) fmt.Fprintf(out, "func (%c %s) %s", api[0], mixedCase(api), gname) fmt.Fprintf(out, "(%s) %s { ", goArgs(args), rtype.Gotype) + fmt.Fprint(out, cAssigns(args)) rtype.printBody(out, fmt.Sprintf("C.pn_%s_%s(%c.pn%s)", api, fname, api[0], cArgs(args))) fmt.Fprintf(out, "}\n") } @@ -363,10 +385,13 @@ package event import ( "time" + "unsafe" + "qpid.apache.org/proton/internal" ) // #include <proton/types.h> // #include <proton/event.h> +// #include <stdlib.h> `) for _, api := range apis { fmt.Fprintf(out, "// #include <proton/%s.h>\n", api) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go index 40e1f7c..38c2d00 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go @@ -18,15 +18,17 @@ under the License. */ /* -Package proton is a Go binding for the proton AMQP protocol engine. +Package proton encodes and decodes AMQP messages and data as Go types. -It alows you to construct and parse AMQP messages, and to implement AMQP -clients, servers and intermediaries that can exchange messages with any -AMQP 1.0 compliant endpoint. +It follows the standard 'encoding' libraries pattern. The mapping between AMQP +and Go types is described in the documentation of the Marshal and Unmarshal +functions. -Encoding and decoding AMQP data follows the pattern of the standard -encoding/json and encoding/xml packages.The mapping between AMQP and Go types is -described in the documentation of the Marshal and Unmarshal functions. +The sub-packages 'event' and 'messaging' provide two alternative ways to write +AMQP clients and servers. 'messaging' is easier for general purpose use. 'event' +gives complete low-level control of the underlying proton C engine. + +AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/> */ package proton @@ -34,3 +36,5 @@ package proton import "C" // This file is just for the package comment. + +// FIXME aconway 2015-04-28: need to re-organize the package, it's not very intuitive. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go b/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go deleted file mode 100644 index 2f83760..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/dummy.go +++ /dev/null @@ -1,82 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package proton - -import ( - "fmt" - "net" -) - -// Placeholder definitions to allow examples to compile. - -type Connection struct { - Server bool // Server connection does protocol negotiation - // FIXME aconway 2015-04-17: Other parameters to set up SSL, SASL etc. -} - -// Map an AMQP connection using conn -func (c Connection) Connect(conn net.Conn) error { return nil } -func (c Connection) Close() error { return nil } - -func (c Connection) Receiver(addr string) (*Receiver, error) { - // FIXME aconway 2015-04-10: dummy implementation to test examples, returns endless messages. - r := &Receiver{make(chan Message), make(chan struct{})} - go func() { - for i := 0; ; i++ { - m := NewMessage() - m.SetBody(fmt.Sprintf("%v-%v", addr, i)) - select { - case r.Receive <- m: - case <-r.closed: - return - } - } - }() - return r, nil -} - -func (c Connection) Sender(addr string) (*Sender, error) { - return &Sender{}, nil -} - -type Receiver struct { - Receive chan Message - closed chan struct{} -} - -func (r Receiver) Close() error { return nil } - -type Sender struct{} - -func (s Sender) Send(m Message) error { fmt.Println(m.Body()); return nil } -func (s Sender) Close() error { return nil } - -// Connect makes a default client connection using conn. -// -// For more control do: -// c := Connection{} -// // set parameters on c -// c.Connect(conn) -// -func Connect(conn net.Conn) (Connection, error) { - c := Connection{} - c.Connect(conn) - return c, nil -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/error.go deleted file mode 100644 index 95927bc..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/error.go +++ /dev/null @@ -1,111 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package proton - -// FIXME aconway 2015-04-08: consolidate with event/errors.go - -// #include <proton/error.h> -// #include <proton/codec.h> -import "C" - -import ( - "fmt" - "reflect" - "runtime" -) - -var pnErrorNames = map[int]string{ - C.PN_EOS: "end of data", - C.PN_ERR: "error", - C.PN_OVERFLOW: "overflow", - C.PN_UNDERFLOW: "underflow", - C.PN_STATE_ERR: "bad state", - C.PN_ARG_ERR: "invalid argument", - C.PN_TIMEOUT: "timeout", - C.PN_INTR: "interrupted", - C.PN_INPROGRESS: "in progress", -} - -func pnErrorName(code int) string { - name := pnErrorNames[code] - if name != "" { - return name - } else { - return fmt.Sprintf("unknown-error(%s)", code) - } -} - -// pnError converst a pn_error_t to a Go error. Returns nil if e has 0 error status -func pnError(prefix string, e *C.pn_error_t) error { - if e == nil || int(C.pn_error_code(e)) == 0 { - return nil - } - code := int(C.pn_error_code(e)) - return errorf("%s %s: %s", pnErrorName(code), prefix, - C.GoString(C.pn_error_text(e))) -} - -type BadUnmarshal struct { - AMQPType string - GoType reflect.Type -} - -func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal { - return &BadUnmarshal{pnTypeString(pnType), reflect.TypeOf(v)} -} - -func (e BadUnmarshal) Error() string { - if e.GoType.Kind() != reflect.Ptr { - return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType) - } else { - return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType) - } -} - -// errorf creates an error with a formatted message -func errorf(format string, a ...interface{}) error { - return fmt.Errorf("proton: %s", fmt.Sprintf(format, a...)) -} - -// doRecover is called to recover from internal panics -func doRecover(err *error) { - r := recover() - switch r := r.(type) { - case nil: - return - case runtime.Error: - panic(r) - case error: - *err = r - default: - panic(r) - } -} - -// panicIf panics if condition is true, the panic value is errorf(fmt, args...) -func panicIf(condition bool, fmt string, args ...interface{}) { - if condition { - panic(errorf(fmt, args...)) - } -} - -func dataError(prefix string, data *C.pn_data_t) error { - return pnError(prefix, C.pn_data_error(data)) -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go index d55dc7d..6a1c8ac 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/doc.go @@ -18,7 +18,17 @@ under the License. */ /* -Package event provides an event-oriented API to the proton AMQP engine. +Package event provides a low-level API to the proton AMQP engine. + +For most tasks, consider instead package qpid.apache.org/proton/messaging. +It provides a higher-level, concurrent API that is easier to use. + +The API is event based. There are two alternative styles of handler. CoreHandler +provides the core proton events. MessagingHandler provides a slighly simplified +view of the event stream and automates some common tasks. + +See type Pump documentation for more details of the interaction between proton +events and goroutines. */ package event http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go deleted file mode 100644 index 42382a4..0000000 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/error.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package event // FIXME aconway 2015-03-26: duplicated from package proton, clean up - -// #include <proton/error.h> -// #include <proton/codec.h> -import "C" - -import ( - "fmt" - "runtime" -) - -var pnErrorNames = map[int]string{ - C.PN_EOS: "end of data", - C.PN_ERR: "error", - C.PN_OVERFLOW: "overflow", - C.PN_UNDERFLOW: "underflow", - C.PN_STATE_ERR: "bad state", - C.PN_ARG_ERR: "invalid argument", - C.PN_TIMEOUT: "timeout", - C.PN_INTR: "interrupted", - C.PN_INPROGRESS: "in progress", -} - -func pnErrorName(code int) string { - name := pnErrorNames[code] - if name != "" { - return name - } else { - return "unknown" - } -} - -func pnError(e *C.pn_error_t) error { - if e == nil || C.pn_error_code(e) == 0 { - return nil - } - return errorf("%s: %s", pnErrorName(int(C.pn_error_code(e))), C.GoString(C.pn_error_text(e))) -} - -// errorf creates an error with a formatted message -func errorf(format string, a ...interface{}) error { - return fmt.Errorf("proton: %s", fmt.Sprintf(format, a...)) -} - -// doRecover is called to recover from internal panics -func doRecover(err *error) { - r := recover() - switch r := r.(type) { - case nil: - return - case runtime.Error: - panic(r) - case error: - *err = r - default: - panic(r) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go index d1ce953..a9e1468 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/handlers.go @@ -23,6 +23,10 @@ package event // #include <proton/handlers.h> import "C" +import ( + "qpid.apache.org/proton/internal" +) + // CoreHandler handles core proton events. type CoreHandler interface { // Handle is called with an event. @@ -40,24 +44,18 @@ func (h cHandler) Handle(e Event) error { return nil // FIXME aconway 2015-03-31: error handling } -func HandShaker() CoreHandler { - return cHandler{C.pn_handshaker()} -} - -func FlowController(prefetch int) CoreHandler { - return cHandler{C.pn_flowcontroller(C.int(prefetch))} -} - -// MessagingHandler provides a higher-level, easier-to-use interface for writing -// applications that send and receive messages. +// MessagingHandler provides an alternative interface to CoreHandler, +// it is easier to use for most applications that send and receive messages. +// +// Implement this interface and then wrap your value with a MessagingHandlerDelegator. +// MessagingHandlerDelegator implements CoreHandler and can be registered with a Pump. // -// You can implement this interface and wrap it with a MessagingHandlerDelegator type MessagingHandler interface { Handle(MessagingEventType, Event) error } -// MessagingEventType provides an easier set of event types to work with -// that the core proton EventType. +// MessagingEventType provides a set of events that are easier to work with than the +// core events defined by EventType // type MessagingEventType int @@ -101,13 +99,13 @@ const ( // The peer initiates the closing of the link. MLinkClosing - // The connection is closed. + // Both ends of the connection are closed. MConnectionClosed - // The session is closed. + // Both ends of the session are closed. MSessionClosed - // The link is closed. + // Both ends of the link are closed. MLinkClosed // The socket is disconnected. @@ -137,7 +135,60 @@ const ( MMessage ) -// Capture common patterns for endpoints opening/closing +func (t MessagingEventType) String() string { + switch t { + case MStart: + return "Start" + case MConnectionError: + return "ConnectionError" + case MSessionError: + return "SessionError" + case MLinkError: + return "LinkError" + case MConnectionOpening: + return "ConnectionOpening" + case MSessionOpening: + return "SessionOpening" + case MLinkOpening: + return "LinkOpening" + case MConnectionOpened: + return "ConnectionOpened" + case MSessionOpened: + return "SessionOpened" + case MLinkOpened: + return "LinkOpened" + case MConnectionClosing: + return "ConnectionClosing" + case MSessionClosing: + return "SessionClosing" + case MLinkClosing: + return "LinkClosing" + case MConnectionClosed: + return "ConnectionClosed" + case MSessionClosed: + return "SessionClosed" + case MLinkClosed: + return "LinkClosed" + case MDisconnected: + return "Disconnected" + case MSendable: + return "Sendable" + case MAccepted: + return "Accepted" + case MRejected: + return "Rejected" + case MReleased: + return "Released" + case MSettled: + return "Settled" + case MMessage: + return "Message" + default: + return "Unknown" + } +} + +// endpointDelegator captures common patterns for endpoints opening/closing type endpointDelegator struct { remoteOpen, remoteClose, localOpen, localClose EventType opening, opened, closing, closed, error MessagingEventType @@ -145,52 +196,63 @@ type endpointDelegator struct { delegate MessagingHandler } -func (d endpointDelegator) Handle(e Event) error { +// Handle handles an open/close event for an endpoint in a generic way. +func (d endpointDelegator) Handle(e Event) (err error) { endpoint := d.endpoint(e) state := endpoint.State() switch e.Type() { case d.localOpen: - if state.RemoteOpen() { - return d.delegate.Handle(d.opened, e) + if state.Is(SRemoteActive) { + err = d.delegate.Handle(d.opened, e) } case d.remoteOpen: switch { - case state.LocalOpen(): - return d.delegate.Handle(d.opened, e) - case state.LocalUninitialized(): - err := d.delegate.Handle(d.opening, e) + case state.Is(SLocalActive): + err = d.delegate.Handle(d.opened, e) + case state.Is(SLocalUninit): + err = d.delegate.Handle(d.opening, e) if err == nil { endpoint.Open() } - return err } case d.remoteClose: - switch { - case endpoint.RemoteCondition().IsSet(): - d.delegate.Handle(d.error, e) - case state.LocalClosed(): - d.delegate.Handle(d.closed, e) - default: - d.delegate.Handle(d.closing, e) + var err1 error + if endpoint.RemoteCondition().IsSet() { + err1 = d.delegate.Handle(d.error, e) + if err1 == nil { + err1 = endpoint.RemoteCondition().Error() + } + } + if state.Is(SLocalClosed) { + err = d.delegate.Handle(d.closed, e) + } else { + err = d.delegate.Handle(d.closing, e) + endpoint.Close() + } + if err1 != nil { + err = err1 } - endpoint.Close() case d.localClose: - // Nothing to do + if state.Is(SRemoteClosed) { + err = d.delegate.Handle(d.closed, e) + } default: - panic("internal error") // We shouldn't be called with any other event type. + // We shouldn't be called with any other event type. + panic(internal.Errorf("internal error, not an open/close event: %s", e)) } - return nil + + return err } // MessagingDelegator implments a CoreHandler and delegates to a MessagingHandler. // You can modify the exported fields before you pass the MessagingDelegator to -// a Pump +// a Pump. type MessagingDelegator struct { delegate MessagingHandler connection, session, link endpointDelegator @@ -231,7 +293,6 @@ func NewMessagingDelegator(h MessagingHandler) CoreHandler { func(e Event) Endpoint { return e.Link() }, h, }, - handshaker: HandShaker(), flowcontroller: nil, AutoSettle: true, AutoAccept: true, @@ -248,13 +309,12 @@ func handleIf(h CoreHandler, e Event) error { } func (d *MessagingDelegator) Handle(e Event) error { - handleIf(d.handshaker, e) handleIf(d.flowcontroller, e) // FIXME aconway 2015-03-31: error handling. switch e.Type() { case EConnectionInit: - d.flowcontroller = FlowController(d.Prefetch) + d.flowcontroller = cHandler{C.pn_flowcontroller(C.int(d.Prefetch))} d.delegate.Handle(MStart, e) case EConnectionRemoteOpen, EConnectionRemoteClose, EConnectionLocalOpen, EConnectionLocalClose: @@ -277,6 +337,9 @@ func (d *MessagingDelegator) Handle(e Event) error { } else { d.outgoing(e) } + + case ETransportTailClosed: + d.delegate.Handle(MDisconnected, e) } return nil } @@ -284,7 +347,7 @@ func (d *MessagingDelegator) Handle(e Event) error { func (d *MessagingDelegator) incoming(e Event) (err error) { delivery := e.Delivery() if delivery.Readable() && !delivery.Partial() { - if e.Link().State().LocalClosed() { + if e.Link().State().Is(SLocalClosed) { e.Link().Advance() if d.AutoAccept { delivery.Release(false) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go new file mode 100644 index 0000000..bd7dddd --- /dev/null +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/message.go @@ -0,0 +1,75 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package event + +// #include <proton/types.h> +// #include <proton/message.h> +// #include <proton/codec.h> +import "C" + +import ( + "qpid.apache.org/proton" + "qpid.apache.org/proton/internal" +) + +// DecodeMessage decodes the message containined in a delivery event. +func DecodeMessage(e Event) (m proton.Message, err error) { + defer internal.DoRecover(&err) + delivery := e.Delivery() + if !delivery.Readable() || delivery.Partial() { + return nil, internal.Errorf("attempting to get incomplete message") + } + data := make([]byte, delivery.Pending()) + result := delivery.Link().Recv(data) + if result != len(data) { + return nil, internal.Errorf("cannot receive message: %s", internal.PnErrorCode(result)) + } + return proton.DecodeMessage(data) +} + +// FIXME aconway 2015-04-08: proper handling of delivery tags. Tag counter per link. +var tags proton.UidCounter + +// Send sends a proton.Message over a Link. +// Returns a Delivery that can be use to determine the outcome of the message. +func (link Link) Send(m proton.Message) (Delivery, error) { + if !link.IsSender() { + return Delivery{}, internal.Errorf("attempt to send message on receiving link") + } + // FIXME aconway 2015-04-08: buffering, error handling + delivery := link.Delivery(tags.Next()) + bytes, err := m.Encode(nil) + if err != nil { + return Delivery{}, internal.Errorf("cannot send mesage %s", err) + } + result := link.SendBytes(bytes) + link.Advance() + if result != len(bytes) { + if result < 0 { + return delivery, internal.Errorf("send failed %v", internal.PnErrorCode(result)) + } else { + return delivery, internal.Errorf("send incomplete %v of %v", result, len(bytes)) + } + } + if link.RemoteSndSettleMode() == PnSndSettled { // FIXME aconway 2015-04-08: enum names + delivery.Settle() + } + return delivery, nil +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go index 17257b5..480c994 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/pump.go @@ -25,104 +25,160 @@ package event // #include <proton/reactor.h> // #include <proton/handlers.h> // #include <proton/transport.h> +// #include <proton/session.h> // #include <memory.h> +// #include <stdlib.h> // +// PN_HANDLE(REMOTE_ADDR) import "C" import ( + "fmt" + "io" "net" + "qpid.apache.org/proton/internal" "sync" "unsafe" ) -// FIXME aconway 2015-04-09: We should allow data from multiple connections to be pumped -// into a single event loop (using the proton Reactor) -// That would allow the user to decide if they want an event-loop goroutine per connection -// or if they want to handle several connections in one event loop. +// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel. +type bufferChan struct { + buffers chan []byte + buf1, buf2 []byte +} -// Pump reads from a net.Conn, decodes AMQP events and calls the appropriate -// Handler functions. Actions taken by Handler functions (such as sending messages) -// are encoded and written to the net.Conn. -// +func newBufferChan(size int) *bufferChan { + return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)} +} + +func (b *bufferChan) buffer() []byte { + b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers. + return b.buf1[:cap(b.buf1)] +} + +// FIXME aconway 2015-05-04: direct sending to Inject may block user goroutines if +// the pum stops. Make this a function that selects on running. + +// FIXME aconway 2015-05-05: for consistency should Pump be called Driver? + +/* +Pump reads from a net.Conn, decodes AMQP events and calls the appropriate +Handler functions. Actions taken by Handler functions (such as sending messages) +are encoded and written to the net.Conn. + +The proton protocol engine is single threaded (per connection). The Pump runs +proton in the goroutine that calls Pump.Run() and creates goroutines to feed +data to/from a net.Conn. You can create multiple Pumps to handle multiple +connections concurrently. + +Methods in this package can only be called in the goroutine that executes the +corresponding Pump.Run(). You implement the CoreHandler or MessagingHandler +interfaces and provide those values to NewPump(). Their Handle method will be +called in the Pump goroutine, in typical event-driven style. + +Handlers can pass values from an event (Connections, Links, Deliveries etc.) to +other goroutines, store them, or use them as map indexes. Effectively they are +just C pointers. Other goroutines cannot call their methods directly but they +can can create function closures that call their methods and send those closures +to the Pump.Inject channel. They will execute safely in the pump +goroutine. Injected functions, or your handlers, can set up channels to get +results back to other goroutines. + +You are responsible for ensuring you don't use an event value after the C object +has been deleted. The handler methods will tell you when a value is no longer +valid. For example after a MethodHandler handles a LinkClosed event, that link +is no longer valid. If you do Link.Close() yourself (in a handler or injected +function) the link remains valid until the corresponing LinkClosed event is +received by the handler. + +Pump.Close() will take care of cleaning up any remaining values and types when +you are done with the Pump. All values associated with a pump become invalid +when you call Pump.Close() + +The qpid.apache.org/proton/messaging package will do all this for you, so unless +you are doing something fairly low-level it is probably a better choice. + +*/ type Pump struct { + // Error is set on exit from Run() if there was an error. + Error error + // Channel to inject functions to be executed in the Pump's proton event loop. + Inject chan func() + conn net.Conn transport *C.pn_transport_t connection *C.pn_connection_t collector *C.pn_collector_t - read chan []byte // Read buffers channel. Will close when pump closes. - write chan []byte // Write buffer channel. Must be closed when read closes. - waiter sync.WaitGroup // Wait for read and write goroutines to complete. - handlers []CoreHandler // Handlers for proton events. - - inject chan func() // Functions inject into the loop - closed chan struct{} // This channel will be closed when the remote end closes. + read *bufferChan // Read buffers channel. + write *bufferChan // Write buffers channel. + handlers []CoreHandler // Handlers for proton events. + running chan struct{} // This channel will be closed when the goroutines are done. } const bufferSize = 4096 -// NewPump initializes a pump with a connection and handlers.. Call `go Run()` to start it running. +var pumps map[*C.pn_connection_t]*Pump + +func init() { + pumps = make(map[*C.pn_connection_t]*Pump) +} + +// NewPump initializes a pump with a connection and handlers. To start it running: +// p := NewPump(...) +// go run p.Run() +// The goroutine will exit when the pump is closed or disconnected. +// You can check for errors on Pump.Error. +// func NewPump(conn net.Conn, handlers ...CoreHandler) (*Pump, error) { + // Save the connection ID for Connection.String() p := &Pump{ + Inject: make(chan func(), 100), // FIXME aconway 2015-05-04: blocking hack conn: conn, transport: C.pn_transport(), connection: C.pn_connection(), collector: C.pn_collector(), handlers: handlers, - read: make(chan []byte), - write: make(chan []byte), - inject: make(chan func()), - closed: make(chan struct{}), + read: newBufferChan(bufferSize), + write: newBufferChan(bufferSize), + running: make(chan struct{}), } - p.handlers = append(p.handlers, handlers...) - if p.transport == nil || p.connection == nil || p.collector == nil { - return nil, errorf("failed to allocate pump") + return nil, internal.Errorf("failed to allocate pump") } pnErr := int(C.pn_transport_bind(p.transport, p.connection)) if pnErr != 0 { - return nil, errorf("cannot setup pump: %s", pnErrorName(pnErr)) + return nil, internal.Errorf("cannot setup pump: %s", internal.PnErrorCode(pnErr)) } C.pn_connection_collect(p.connection, p.collector) C.pn_connection_open(p.connection) + pumps[p.connection] = p return p, nil } -// Server puts the Pump in server mode, meaning it will auto-detect security settings on -// the incoming connnection such as use of SASL and SSL. -// Must be called before Run() -func (p *Pump) Server() { - C.pn_transport_set_server(p.transport) +func (p *Pump) String() string { + return fmt.Sprintf("(%s-%s)", p.conn.LocalAddr(), p.conn.RemoteAddr()) } -// Run the pump. Normally called in a goroutine as: go pump.Run() -func (p *Pump) Run() { - go p.run() +func (p *Pump) Id() string { + return fmt.Sprintf("%p", &p) } -// Pump handles the connction close event to close itself. -func (p *Pump) Handle(e Event) error { - switch e.Type() { - case EConnectionLocalClose: - return p.close() +// setError sets error only if not already set +func (p *Pump) setError(e error) { + if p.Error == nil { + p.Error = e } - return nil } -// Closing the pump will also close the net.Conn and stop associated goroutines. -func (p *Pump) Close() error { - // FIXME aconway 2015-04-08: broken - - // Note this is called externally, outside the proton event loop. - // Polite AMQP close - p.inject <- func() { C.pn_connection_close(p.connection) } - _, _ = <-p.closed // Wait for remote close - return p.close() +// Server puts the Pump in server mode, meaning it will auto-detect security settings on +// the incoming connnection such as use of SASL and SSL. +// Must be called before Run() +// +func (p *Pump) Server() { + C.pn_transport_set_server(p.transport) } -// close private implementation, call in the event loop. -func (p *Pump) close() error { - p.conn.Close() - p.waiter.Wait() +func (p *Pump) free() { if p.connection != nil { C.pn_connection_free(p.connection) } @@ -138,74 +194,102 @@ func (p *Pump) close() error { C.pn_handler_free(h.pn) } } - return nil // FIXME aconway 2015-03-31: error handling } -// Start goroutines to feed the pn_transport_t from the pump. -func (c *Pump) run() error { - // FIXME aconway 2015-03-17: error handling - c.waiter.Add(2) - var readError, writeError error +// Close closes the AMQP connection, the net.Conn, and stops associated goroutines. +// It will cause Run() to return. Run() may return earlier if the network disconnects +// but you must still call Close() to clean everything up. +// +// Methods on values associated with the pump (Connections, Sessions, Links) will panic +// if called after Close() +// +func (p *Pump) Close() error { + // If the pump is still running, inject a close. Either way wait for it to finish. + select { + case p.Inject <- func() { C.pn_connection_close(p.connection) }: + <-p.running // Wait to finish + case <-p.running: // Wait for goroutines to finish + } + delete(pumps, p.connection) + p.free() + return p.Error +} - go func() { // Read - rbuf, rbuf2 := make([]byte, bufferSize), make([]byte, bufferSize) +// Run the pump. Normally called in a goroutine as: go pump.Run() +// An error dunring Run is stored on p.Error. +// +func (p *Pump) Run() { + // Signal errors from the read/write goroutines. Don't block if we don't + // read all the errors, we only care about the first. + error := make(chan error, 2) + // FIXME aconway 2015-05-04: stop := make(chan struct{}) // Closed to signal that read/write should stop. + + wait := sync.WaitGroup{} + wait.Add(2) + + go func() { // Read goroutine + defer wait.Done() for { - rbuf = rbuf[:cap(rbuf)] - n, err := c.conn.Read(rbuf) + rbuf := p.read.buffer() + n, err := p.conn.Read(rbuf) if n > 0 { - c.read <- rbuf[:n] + p.read.buffers <- rbuf[:n] + } else if err != nil { + close(p.read.buffers) + error <- err + return } - if err != nil { - readError = err - break - } - rbuf, rbuf2 = rbuf2, rbuf // Swap the buffers, fill the one not in use. } - close(c.read) - c.waiter.Done() }() - go func() { // Write - for wbuf := range c.write { - _, err := c.conn.Write(wbuf) + go func() { // Write goroutine + defer wait.Done() + for { + wbuf, ok := <-p.write.buffers + if !ok { + return + } + _, err := p.conn.Write(wbuf) if err != nil { - writeError = err - break + error <- err + return } } - c.waiter.Done() }() - // Proton driver loop - wbuf, wbuf2 := make([]byte, bufferSize), make([]byte, bufferSize) - wbuf = c.pop(wbuf) // First write buffer - for { // handle pn_transport_t - select { - case buf, ok := <-c.read: // Read a buffer - if !ok { // Read channel closed - break - } - c.push(buf) - - case c.write <- wbuf: // Write a buffer - wbuf, wbuf2 = wbuf2, wbuf // Swap the buffers, fill the unused one. - wbuf = c.pop(wbuf) // Next buffer to write + wbuf := p.write.buffer()[:0] +loop: + for { + if len(wbuf) == 0 { + p.pop(&wbuf) + } + // Don't set wchan unless there is something to write. + var wchan chan []byte + if len(wbuf) > 0 { + wchan = p.write.buffers + } - case f := <-c.inject: // Function injected from another goroutine + select { + case buf := <-p.read.buffers: // Read a buffer + p.push(buf) + case wchan <- wbuf: // Write a buffer + wbuf = p.write.buffer()[:0] + case f := <-p.Inject: // Function injected from another goroutine f() + case err := <-error: // Read or write error + p.setError(err) + C.pn_transport_close_tail(p.transport) + C.pn_transport_close_head(p.transport) + } + if err := p.process(); err != nil { + p.setError(err) + break loop } - c.process() // FIXME aconway 2015-03-17: error handling - } - - close(c.write) - c.waiter.Wait() // Wait for read/write goroutines to finish - switch { - case readError != nil: - return readError - case writeError != nil: - return writeError } - return nil + close(p.write.buffers) + p.conn.Close() + wait.Wait() + close(p.running) // Signal goroutines have exited and Error is set. } func minInt(a, b int) int { @@ -216,47 +300,58 @@ func minInt(a, b int) int { } } -func (c *Pump) pop(buf []byte) []byte { - pending := int(C.pn_transport_pending(c.transport)) - if pending == int(C.PN_EOS) { - return nil - } - if pending < 0 { - panic(errorf(pnErrorName(pending))) +func (p *Pump) pop(buf *[]byte) { + pending := int(C.pn_transport_pending(p.transport)) + switch { + case pending == int(C.PN_EOS): + *buf = (*buf)[:] + return + case pending < 0: + panic(internal.Errorf("%s", internal.PnErrorCode(pending))) } - size := minInt(pending, cap(buf)) - buf = buf[:size] + size := minInt(pending, cap(*buf)) + *buf = (*buf)[:size] if size == 0 { - return buf + return } - C.memcpy(unsafe.Pointer(&buf[0]), unsafe.Pointer(C.pn_transport_head(c.transport)), C.size_t(size)) - C.pn_transport_pop(c.transport, C.size_t(size)) - return buf + C.memcpy(unsafe.Pointer(&(*buf)[0]), unsafe.Pointer(C.pn_transport_head(p.transport)), C.size_t(size)) + C.pn_transport_pop(p.transport, C.size_t(size)) } -func (c *Pump) push(buf []byte) { +func (p *Pump) push(buf []byte) { buf2 := buf for len(buf2) > 0 { - n := int(C.pn_transport_push(c.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2)))) + n := int(C.pn_transport_push(p.transport, (*C.char)(unsafe.Pointer((&buf2[0]))), C.size_t(len(buf2)))) if n <= 0 { - panic(errorf("error in transport: %s", pnErrorName(n))) + panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n))) } buf2 = buf2[n:] } } -func (c *Pump) process() { - for ce := C.pn_collector_peek(c.collector); ce != nil; ce = C.pn_collector_peek(c.collector) { - e := Event{ce} - for _, h := range c.handlers { - h.Handle(e) // FIXME aconway 2015-03-18: error handling +func (p *Pump) handle(e Event) error { + for _, h := range p.handlers { + if err := h.Handle(e); err != nil { + return err } - C.pn_collector_pop(c.collector) } + if e.Type() == ETransportClosed { + return io.EOF + } + return nil } -func (c *Pump) AddHandlers(handlers ...CoreHandler) { - c.inject <- func() { - c.handlers = append(c.handlers, handlers...) +func (p *Pump) process() error { + // FIXME aconway 2015-05-04: if a Handler returns error we should stop the pump + for ce := C.pn_collector_peek(p.collector); ce != nil; ce = C.pn_collector_peek(p.collector) { + e := Event{ce} + if err := p.handle(e); err != nil { + return err + } + C.pn_collector_pop(p.collector) } + return nil } + +// Connectoin gets the Pump's connection value. +func (p *Pump) Connection() Connection { return Connection{p.connection} } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0c11d11c/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go index c3c0a7d..3584311 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/event/wrappers.go @@ -21,17 +21,26 @@ package event //#include <proton/codec.h> //#include <proton/connection.h> +//#include <proton/session.h> +//#include <proton/session.h> //#include <proton/delivery.h> //#include <proton/event.h> //#include <proton/transport.h> //#include <proton/link.h> +//#include <stdlib.h> import "C" import ( + "fmt" + "qpid.apache.org/proton/internal" "unsafe" ) -// Data holds a pointer to decoded AMQP data, proton.marshal/unmarshal to access it as Go data. +// FIXME aconway 2015-05-05: Documentation for generated types. + +// Data holds a pointer to decoded AMQP data. +// Use proton.marshal/unmarshal to access it as Go data types. +// type Data struct{ pn *C.pn_data_t } func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} } @@ -40,20 +49,24 @@ func (d Data) Free() { C.pn_data_free(d.pn) } func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) } func (d Data) Clear() { C.pn_data_clear(d.pn) } func (d Data) Rewind() { C.pn_data_rewind(d.pn) } -func (d Data) Error() error { return pnError(C.pn_data_error(d.pn)) } +func (d Data) Error() error { + return internal.PnError(unsafe.Pointer(C.pn_data_error(d.pn))) +} // State holds the state flags for an AMQP endpoint. type State byte -func (s State) LocalUninitialized() bool { return s&C.PN_LOCAL_UNINIT != 0 } -func (s State) LocalActive() bool { return s&C.PN_LOCAL_ACTIVE != 0 } -func (s State) LocalOpen() bool { return s&C.PN_LOCAL_ACTIVE != 0 } -func (s State) LocalClosed() bool { return s&C.PN_LOCAL_CLOSED != 0 } +const ( + SLocalUninit State = C.PN_LOCAL_UNINIT + SLocalActive = C.PN_LOCAL_ACTIVE + SLocalClosed = C.PN_LOCAL_CLOSED + SRemoteUninit = C.PN_REMOTE_UNINIT + SRemoteActive = C.PN_REMOTE_ACTIVE + SRemoteClosed = C.PN_REMOTE_CLOSED +) -func (s State) RemoteUninitialized() bool { return s&C.PN_REMOTE_UNINIT != 0 } -func (s State) RemoteActive() bool { return s&C.PN_REMOTE_ACTIVE != 0 } -func (s State) RemoteOpen() bool { return s&C.PN_REMOTE_ACTIVE != 0 } -func (s State) RemoteClosed() bool { return s&C.PN_REMOTE_CLOSED != 0 } +// Is is True if bits & state is non 0. +func (s State) Is(bits State) bool { return s&bits != 0 } // Return a State containig just the local flags func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) } @@ -63,14 +76,18 @@ func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) } // Endpoint is the common interface for Connection, Link and Session. type Endpoint interface { + // State is the open/closed state. State() State + // Open an endpoint. Open() + // Close an endpoint. Close() + // Condition holds a local error condition. Condition() Condition + // RemoteCondition holds a remote error condition. RemoteCondition() Condition } -// Disposition types const ( Received uint64 = C.PN_RECEIVED Accepted = C.PN_ACCEPTED @@ -102,8 +119,11 @@ func (d Delivery) Release(delivered bool) { } } -type Transport struct{ pn *C.pn_transport_t } -type DeliveryTag struct{ pn C.pn_delivery_tag_t } // FIXME aconway 2015-03-25: convert to string +// FIXME aconway 2015-05-05: don't expose DeliveryTag as a C pointer, just as a String? + +type DeliveryTag struct{ pn C.pn_delivery_tag_t } + +func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) } func (l Link) Recv(buf []byte) int { if len(buf) == 0 { @@ -112,7 +132,7 @@ func (l Link) Recv(buf []byte) int { return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf)))) } -func (l Link) Send(bytes []byte) int { +func (l Link) SendBytes(bytes []byte) int { return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes))) } @@ -135,3 +155,70 @@ func cPtr(b []byte) *C.char { func cLen(b []byte) C.size_t { return C.size_t(len(b)) } + +func (s Session) Sender(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_sender(s.pn, cname)} +} + +func (s Session) Receiver(name string) Link { + cname := C.CString(name) + defer C.free(unsafe.Pointer(cname)) + return Link{C.pn_receiver(s.pn, cname)} +} + +func joinId(a, b interface{}) string { + return fmt.Sprintf("%s/%s", a, b) +} + +// Pump associated with this connection. +func (c Connection) Pump() *Pump { return pumps[c.pn] } + +// Unique (per process) string identifier for a connection, useful for debugging. +func (c Connection) String() string { return pumps[c.pn].String() } + +// Head functions don't follow the normal naming conventions so missed by the generator. + +func (c Connection) LinkHead(s State) Link { + return Link{C.pn_link_head(c.pn, C.pn_state_t(s))} +} + +func (c Connection) SessionHead(s State) Session { + return Session{C.pn_session_head(c.pn, C.pn_state_t(s))} +} + +// Unique (per process) string identifier for a session, including connection identifier. +func (s Session) String() string { + return joinId(s.Connection(), fmt.Sprintf("%p", s.pn)) +} + +// Unique (per process) string identifier for a link, inlcuding session identifier. +func (l Link) String() string { + return joinId(l.Session(), l.Name()) +} + +// Error returns an error interface corresponding to Condition. +func (c Condition) Error() error { + if c.IsNil() { + return nil + } else { + return fmt.Errorf("%s: %s", c.Name(), c.Description()) + } +} + +// SetIfUnset sets name and description on a condition if it is not already set. +func (c Condition) SetIfUnset(name, description string) { + if !c.IsSet() { + c.SetName(name) + c.SetDescription(description) + } +} + +func (c Connection) Session() (Session, error) { + s := Session{C.pn_session(c.pn)} + if s.IsNil() { + return s, Connection(c).Error() + } + return s, nil +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
