PROTON-827: Simplified examples and Connection error handling. - Simplified non-relevant code in examples (logging, argument handling) - Improved error handling on API see Connection.Error(). Need more on other endpoints. - Added -debug flag to example_test to help debug example problems.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e964480f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e964480f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e964480f Branch: refs/heads/proton-go Commit: e964480ff31a0584661f0dbce3961dad29b9e4d8 Parents: 377ff9b Author: Alan Conway <[email protected]> Authored: Mon May 25 17:14:22 2015 -0400 Committer: Alan Conway <[email protected]> Committed: Mon Sep 28 14:08:23 2015 -0400 ---------------------------------------------------------------------- examples/go/event_broker.go | 199 +++++++++---------- examples/go/example_test.go | 30 +-- examples/go/receive.go | 140 +++++-------- examples/go/send.go | 108 ++++------ proton-c/bindings/go/genwrap.go | 108 +++++----- .../src/qpid.apache.org/proton/go/event/pump.go | 5 +- .../qpid.apache.org/proton/go/internal/error.go | 25 +-- .../proton/go/messaging/messaging.go | 51 +++-- 8 files changed, 307 insertions(+), 359 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/examples/go/event_broker.go ---------------------------------------------------------------------- diff --git a/examples/go/event_broker.go b/examples/go/event_broker.go index 0cb4bfa..2578bd5 100644 --- a/examples/go/event_broker.go +++ b/examples/go/event_broker.go @@ -31,97 +31,32 @@ import ( "container/list" "flag" "fmt" - "io" - "io/ioutil" - "log" "net" "os" - "path" "qpid.apache.org/proton/go/amqp" "qpid.apache.org/proton/go/event" "sync" ) -// Command-line flags +// Usage and command-line flags +func usage() { + fmt.Fprintf(os.Stderr, ` +Usage: %s +A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. +`, os.Args[0]) + flag.PrintDefaults() +} + +var debug = flag.Bool("debug", false, "Print detailed debug output") var addr = flag.String("addr", ":amqp", "Listening address") -var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") var full = flag.Bool("full", false, "Print full message not just body.") func main() { - flag.Usage = func() { - fmt.Fprintf(os.Stderr, ` -Usage: %s -A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. -`, os.Args[0]) - flag.PrintDefaults() - } + flag.Usage = usage flag.Parse() b := newBroker() err := b.listen(*addr) - fatalIf(err) -} - -// queue is a structure representing a queue. -type queue struct { - name string // Name of queue - messages *list.List // List of event.Message - consumers map[event.Link]bool // Set of consumer links -} - -type logLink event.Link // Wrapper to print links in format for logging - -func (ll logLink) String() string { - l := event.Link(ll) - return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump()) -} - -func (q *queue) subscribe(link event.Link) { - debug.Printf("link %s subscribed to queue %s", logLink(link), q.name) - q.consumers[link] = true -} - -func (q *queue) unsubscribe(link event.Link) { - debug.Printf("link %s unsubscribed from queue %s", logLink(link), q.name) - delete(q.consumers, link) -} - -func (q *queue) empty() bool { - return len(q.consumers) == 0 && q.messages.Len() == 0 -} - -func (q *queue) push(context *event.Pump, message amqp.Message) { - q.messages.PushBack(message) - q.pop(context) -} - -func (q *queue) popTo(context *event.Pump, link event.Link) bool { - if q.messages.Len() != 0 && link.Credit() > 0 { - message := q.messages.Remove(q.messages.Front()).(amqp.Message) - debug.Printf("link %s <- queue %s: %s", logLink(link), q.name, formatMessage{message}) - // The first return parameter is an event.Delivery. - // The Deliver can be used to track message status, e.g. so we can re-delver on failure. - // This demo broker doesn't do that. - linkPump := link.Session().Connection().Pump() - if context == linkPump { - if context == nil { - log.Fatal("pop in nil context") - } - link.Send(message) // link is in the current pump, safe to call Send() direct - } else { - linkPump.Inject <- func() { // Inject to link's pump - link.Send(message) // FIXME aconway 2015-05-04: error handlig - } - } - return true - } - return false -} - -func (q *queue) pop(context *event.Pump) (popped bool) { - for c, _ := range q.consumers { - popped = popped || q.popTo(context, c) - } - return + exitIf(err) } // broker implements event.MessagingHandler and reacts to events by moving messages on or off queues. @@ -137,7 +72,7 @@ func newBroker() *broker { func (b *broker) getQueue(name string) *queue { q := b.queues[name] if q == nil { - debug.Printf("Create queue %s", name) + debugf("Create queue %s\n", name) q = &queue{name, list.New(), make(map[event.Link]bool)} b.queues[name] = q } @@ -150,7 +85,7 @@ func (b *broker) unsubscribe(l event.Link) { if q != nil { q.unsubscribe(l) if q.empty() { - debug.Printf("Delete queue %s", q.name) + debugf("Delete queue %s\n", q.name) delete(b.queues, q.name) } } @@ -181,9 +116,9 @@ func (b *broker) HandleMessagingEvent(t event.MessagingEventType, e event.Event) case event.MMessage: m, err := event.DecodeMessage(e) - fatalIf(err) + exitIf(err) qname := e.Link().RemoteTarget().Address() - debug.Printf("link %s -> queue %s: %s", logLink(e.Link()), qname, formatMessage{m}) + debugf("link %s -> queue %s: %s\n", logLink(e.Link()), qname, formatMessage(m)) b.getQueue(qname).push(e.Connection().Pump(), m) } return nil @@ -195,61 +130,111 @@ func (b *broker) listen(addr string) (err error) { if err != nil { return err } - info.Printf("Listening on %s", listener.Addr()) + fmt.Printf("Listening on %s\n", listener.Addr()) defer listener.Close() for { conn, err := listener.Accept() if err != nil { - info.Printf("Accept error: %s", err) + fmt.Fprintf(os.Stderr, "Accept error: %s\n", err) continue } pump, err := event.NewPump(conn, event.NewMessagingDelegator(b)) - fatalIf(err) - info.Printf("Accepted %s[%p]", pump, pump) + exitIf(err) + debugf("Accepted %s[%p]\n", pump, pump) pump.Server() go func() { pump.Run() if pump.Error == nil { - info.Printf("Closed %s", pump) + debugf("Closed %s\n", pump) } else { - info.Printf("Closed %s: %s", pump, pump.Error) + debugf("Closed %s: %s\n", pump, pump.Error) } }() } } -// Logging -func logger(prefix string, level int, w io.Writer) *log.Logger { - if *verbose >= level { - return log.New(w, prefix, 0) +// queue is a structure representing a queue. +type queue struct { + name string // Name of queue + messages *list.List // List of event.Message + consumers map[event.Link]bool // Set of consumer links +} + +type logLink event.Link // Wrapper to print links in useful format for logging + +func (ll logLink) String() string { + l := event.Link(ll) + return fmt.Sprintf("%s[%p]", l.Name(), l.Session().Connection().Pump()) +} + +func (q *queue) subscribe(link event.Link) { + debugf("link %s subscribed to queue %s\n", logLink(link), q.name) + q.consumers[link] = true +} + +func (q *queue) unsubscribe(link event.Link) { + debugf("link %s unsubscribed from queue %s\n", logLink(link), q.name) + delete(q.consumers, link) +} + +func (q *queue) empty() bool { + return len(q.consumers) == 0 && q.messages.Len() == 0 +} + +func (q *queue) push(context *event.Pump, message amqp.Message) { + q.messages.PushBack(message) + q.pop(context) +} + +func (q *queue) popTo(context *event.Pump, link event.Link) bool { + if q.messages.Len() != 0 && link.Credit() > 0 { + message := q.messages.Remove(q.messages.Front()).(amqp.Message) + debugf("link %s <- queue %s: %s\n", logLink(link), q.name, formatMessage(message)) + // The first return parameter is an event.Delivery. + // The Deliver can be used to track message status, e.g. so we can re-delver on failure. + // This demo broker doesn't do that. + linkPump := link.Session().Connection().Pump() + if context == linkPump { + if context == nil { + exitIf(fmt.Errorf("pop in nil context")) + } + link.Send(message) // link is in the current pump, safe to call Send() direct + } else { + linkPump.Inject <- func() { // Inject to link's pump + link.Send(message) // FIXME aconway 2015-05-04: error handlig + } + } + return true } - return log.New(ioutil.Discard, "", 0) + return false } -var info, debug *log.Logger +func (q *queue) pop(context *event.Pump) (popped bool) { + for c, _ := range q.consumers { + popped = popped || q.popTo(context, c) + } + return +} -func init() { - flag.Parse() - name := path.Base(os.Args[0]) - log.SetFlags(0) - log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. - info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. - debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +// Simple debug logging +func debugf(format string, data ...interface{}) { + if *debug { + fmt.Fprintf(os.Stderr, format, data...) + } } // Simple error handling for demo. -func fatalIf(err error) { +func exitIf(err error) { if err != nil { - log.Fatal(err) + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } } -type formatMessage struct{ m amqp.Message } - -func (fm formatMessage) String() string { +func formatMessage(m amqp.Message) string { if *full { - return fmt.Sprintf("%#v", fm.m) + return fmt.Sprintf("%#v", m) } else { - return fmt.Sprintf("%#v", fm.m.Body()) + return fmt.Sprintf("%#v", m.Body()) } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/examples/go/example_test.go ---------------------------------------------------------------------- diff --git a/examples/go/example_test.go b/examples/go/example_test.go index a4b4c2c..8879c38 100644 --- a/examples/go/example_test.go +++ b/examples/go/example_test.go @@ -78,13 +78,11 @@ func (b *broker) check() error { // Start the demo broker, wait till it is listening on *addr. No-op if already started. func (b *broker) start() error { - build("event_broker.go") if b.cmd == nil { // Not already started // FIXME aconway 2015-04-30: better way to pick/configure a broker port. b.addr = fmt.Sprintf("127.0.0.1:%d", rand.Intn(10000)+10000) - b.cmd = exec.Command(exepath("event_broker"), "-addr", b.addr, "-verbose", "0") + b.cmd = exampleCommand("event_broker", "-addr", b.addr) b.runerr = make(chan error) - // Change the -verbose setting above to see broker output on stdout/stderr. b.cmd.Stderr, b.cmd.Stdout = os.Stderr, os.Stdout go func() { b.runerr <- b.cmd.Run() @@ -111,7 +109,12 @@ func checkEqual(want interface{}, got interface{}) error { // runCommand returns an exec.Cmd to run an example. func exampleCommand(prog string, arg ...string) *exec.Cmd { build(prog + ".go") - cmd := exec.Command(exepath(prog), arg...) + args := []string{} + if *debug { + args = append(args, "-debug=true") + } + args = append(args, arg...) + cmd := exec.Command(exepath(prog), args...) cmd.Stderr = os.Stderr return cmd } @@ -149,16 +152,16 @@ func TestExampleSendReceive(t *testing.T) { } testBroker.start() err := runExampleWant( - "send: Received all 15 acknowledgements\n", + "Received all 15 acknowledgements\n", "send", - exampleArgs("-count", "5", "-verbose", "1")...) + exampleArgs("-count", "5")...) if err != nil { t.Fatal(err) } err = runExampleWant( - "receive: Listening\nreceive: Received 15 messages\n", + "Listening on 3 connections\nReceived 15 messages\n", "receive", - exampleArgs("-verbose", "1", "-count", "15")...) + exampleArgs("-count", "15")...) if err != nil { t.Fatal(err) } @@ -187,7 +190,7 @@ func goReceiveWant(errchan chan<- error, want string, arg ...string) *exec.Cmd { errchan <- err return } - listening := "receive: Listening\n" + listening := "Listening on 3 connections\n" if line != listening { errchan <- checkEqual(listening, line) return @@ -209,8 +212,8 @@ func TestExampleReceiveSend(t *testing.T) { testBroker.start() recvErr := make(chan error) recvCmd := goReceiveWant(recvErr, - "receive: Received 15 messages\n", - exampleArgs("-count", "15", "-verbose", "1")...) + "Received 15 messages\n", + exampleArgs("-count", "15")...) defer func() { recvCmd.Process.Kill() recvCmd.Wait() @@ -219,9 +222,9 @@ func TestExampleReceiveSend(t *testing.T) { t.Fatal(err) } err := runExampleWant( - "send: Received all 15 acknowledgements\n", + "Received all 15 acknowledgements\n", "send", - exampleArgs("-count", "5", "-verbose", "1")...) + exampleArgs("-count", "5")...) if err != nil { t.Fatal(err) } @@ -263,6 +266,7 @@ func build(prog string) { } var rpath = flag.String("rpath", "", "Runtime path for test executables") +var debug = flag.Bool("debug", false, "Debugging output from examples") func TestMain(m *testing.M) { rand.Seed(time.Now().UTC().UnixNano()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/examples/go/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/receive.go b/examples/go/receive.go index e31862b..b1eb309 100644 --- a/examples/go/receive.go +++ b/examples/go/receive.go @@ -22,87 +22,65 @@ package main import ( "flag" "fmt" - "io" - "io/ioutil" - "log" - "math" "net" "os" - "path" "qpid.apache.org/proton/go/amqp" "qpid.apache.org/proton/go/messaging" "sync" - "time" ) -// Command-line flags -var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") -var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 means unlimited.") -var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means unlimited.") +// Usage and command-line flags +func usage() { + fmt.Fprintf(os.Stderr, `Usage: %s url [url ...] +Receive messages from all the listed URLs concurrently and print them. +`, os.Args[0]) + flag.PrintDefaults() +} + +var debug = flag.Bool("debug", false, "Print detailed debug output") +var count = flag.Uint64("count", 1, "Stop after receiving this many messages.") var full = flag.Bool("full", false, "Print full message not just body.") func main() { - // Parse flags and arguments, print usage message on error. - flag.Usage = func() { - fmt.Fprintf(os.Stderr, ` -Usage: %s url [url ...] -Receive messages from all the listed URLs concurrently and print them. -`, os.Args[0]) - flag.PrintDefaults() - } + flag.Usage = usage flag.Parse() + urls := flag.Args() // Non-flag arguments are URLs to receive from if len(urls) == 0 { - flag.Usage() - fmt.Fprintf(os.Stderr, "No URL provided") + fmt.Fprintln(os.Stderr, "No URL provided") + usage() os.Exit(1) } - duration := time.Duration(*timeout) * time.Second - if duration == 0 { - duration = time.Duration(math.MaxInt64) // Not forever, but 290 years is close enough. - } - if *count == 0 { - *count = math.MaxInt64 - } - - // Create a goroutine for each URL that receives messages and sends them to - // the messages channel. main() receives and prints them. messages := make(chan amqp.Message) // Channel for messages from goroutines to main() stop := make(chan struct{}) // Closing this channel means the program is stopping. + var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. + wait.Add(len(urls)) // Wait for one goroutine per URL. - var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. - - wait.Add(len(urls)) // Wait for one goroutine per URL. - - // Arrange to close all connections on exit - connections := make([]*messaging.Connection, len(urls)) - defer func() { - for _, c := range connections { - if c != nil { - c.Close() - } - } - }() + connections := make([]*messaging.Connection, len(urls)) // Store connctions to close on exit + // Start a goroutine to for each URL to receive messages and send them to the messages channel. + // main() receives and prints them. for i, urlStr := range urls { - debug.Printf("Connecting to %s", urlStr) - go func(urlStr string) { - defer wait.Done() // Notify main() that this goroutine is done. + debugf("debug: Connecting to %s\n", urlStr) + go func(urlStr string) { // Start the goroutine + + defer wait.Done() // Notify main() when this goroutine is done. url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - fatalIf(err) + exitIf(err) // Open a standard Go net.Conn and and AMQP connection using it. conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - fatalIf(err) + exitIf(err) pc, err := messaging.Connect(conn) // This is our AMQP connection. - fatalIf(err) - connections[i] = pc // So we can close it when main() ends + exitIf(err) + connections[i] = pc // Save connection so it will be closed when main() ends // Create a receiver using the path of the URL as the AMQP address r, err := pc.Receiver(url.Path) - fatalIf(err) + exitIf(err) + // Loop receiving messages for { var m amqp.Message select { // Receive a message or stop. @@ -118,57 +96,43 @@ Receive messages from all the listed URLs concurrently and print them. } }(urlStr) } - info.Printf("Listening") - - // time.After() returns a channel that will close when the timeout is up. - timer := time.After(duration) - - // main() prints each message and checks for count or timeout being exceeded. - for i := int64(0); i < *count; i++ { - select { - case m := <-messages: - debug.Print(formatMessage{m}) - case <-timer: // Timeout has expired - i = 0 - } + + // All goroutines are started, we are receiving messages. + fmt.Printf("Listening on %d connections\n", len(urls)) + + // print each message until the count is exceeded. + for i := uint64(0); i < *count; i++ { + debugf("%s\n", formatMessage(<-messages)) } - info.Printf("Received %d messages", *count) + fmt.Printf("Received %d messages\n", *count) close(stop) // Signal all goroutines to stop. wait.Wait() // Wait for all goroutines to finish. -} - -// Logging -func logger(prefix string, level int, w io.Writer) *log.Logger { - if *verbose >= level { - return log.New(w, prefix, 0) + close(messages) + for _, c := range connections { // Close all connections + if c != nil { + c.Close() + } } - return log.New(ioutil.Discard, "", 0) } -var info, debug *log.Logger - -func init() { - flag.Parse() - name := path.Base(os.Args[0]) - log.SetFlags(0) // Use default logger for errors. - log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. - info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. - debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +// Simple debug logging +func debugf(format string, data ...interface{}) { + if *debug { + fmt.Fprintf(os.Stderr, format, data...) + } } // Simple error handling for demo. -func fatalIf(err error) { +func exitIf(err error) { if err != nil { - log.Fatal(err) + fmt.Fprintln(os.Stderr, err) } } -type formatMessage struct{ m amqp.Message } - -func (fm formatMessage) String() string { +func formatMessage(m amqp.Message) string { if *full { - return fmt.Sprintf("%#v", fm.m) + return fmt.Sprintf("%#v", m) } else { - return fmt.Sprintf("%#v", fm.m.Body()) + return fmt.Sprintf("%#v", m.Body()) } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/examples/go/send.go ---------------------------------------------------------------------- diff --git a/examples/go/send.go b/examples/go/send.go index 4aaeb43..98acefa 100644 --- a/examples/go/send.go +++ b/examples/go/send.go @@ -22,21 +22,23 @@ package main import ( "flag" "fmt" - "io" - "io/ioutil" - "log" - "math" "net" "os" - "path" "qpid.apache.org/proton/go/amqp" "qpid.apache.org/proton/go/messaging" "sync" ) -// Command-line flags -var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") -var count = flag.Int64("count", 1, "Send this may messages per address. 0 means unlimited.") +// Usage and command-line flags +func usage() { + fmt.Fprintf(os.Stderr, `Usage: %s url [url ...] +Send messages to each URL concurrently with body "<url-path>-<n>" where n is the message number. +`, os.Args[0]) + flag.PrintDefaults() +} + +var debug = flag.Bool("debug", false, "Print detailed debug output") +var count = flag.Int64("count", 1, "Send this may messages per address.") // Ack associates an info string with an acknowledgement type Ack struct { @@ -45,67 +47,51 @@ type Ack struct { } func main() { - // Parse flags and arguments, print usage message on error. - flag.Usage = func() { - fmt.Fprintf(os.Stderr, ` -Usage: %s url [url ...] -Send messages to all the listed URLs concurrently. -To each URL, send the string "path-n" where n is the message number. -`, os.Args[0]) - flag.PrintDefaults() - } + flag.Usage = usage flag.Parse() + urls := flag.Args() // Non-flag arguments are URLs to receive from if len(urls) == 0 { + fmt.Fprintln(os.Stderr, "No URL provided") flag.Usage() - fmt.Fprintf(os.Stderr, "No URL provided\n") os.Exit(1) } - if *count == 0 { - *count = math.MaxInt64 - } - - // Create a channel to receive all the acknowledgements - acks := make(chan Ack) - // Create a goroutine for each URL that sends messages. + acks := make(chan Ack) // Channel to receive all the acknowledgements var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. wait.Add(len(urls)) // Wait for one goroutine per URL. - // Arrange to close all connections on exit - connections := make([]*messaging.Connection, len(urls)) - defer func() { - for _, c := range connections { - if c != nil { - c.Close() - } - } - }() + connections := make([]*messaging.Connection, len(urls)) // Store connctions to close on exit + // Start a goroutine for each URL to send messages, receive the acknowledgements and + // send them to the acks channel. for i, urlStr := range urls { - debug.Printf("Connecting to %v", urlStr) + debugf("Connecting to %v\n", urlStr) go func(urlStr string) { + defer wait.Done() // Notify main() that this goroutine is done. url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - fatalIf(err) + exitIf(err) // Open a standard Go net.Conn and and AMQP connection using it. conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - fatalIf(err) + exitIf(err) pc, err := messaging.Connect(conn) // This is our AMQP connection. - fatalIf(err) - connections[i] = pc // So we can close it when main() ends + exitIf(err) + connections[i] = pc // Save connection so it will be closed when main() ends // Create a sender using the path of the URL as the AMQP address s, err := pc.Sender(url.Path) - fatalIf(err) + exitIf(err) + // Loop sending messages, receiving acknowledgements and sending them to the acks channel. for i := int64(0); i < *count; i++ { m := amqp.NewMessage() body := fmt.Sprintf("%v-%v", url.Path, i) m.SetBody(body) + // Note Send is *asynchronous*, ack is a channel that will receive the acknowledgement. ack, err := s.Send(m) - fatalIf(err) + exitIf(err) acks <- Ack{ack, body} // Send the acknowledgement to main() } }(urlStr) @@ -113,44 +99,38 @@ To each URL, send the string "path-n" where n is the message number. // Wait for all the acknowledgements expect := int(*count) * len(urls) - debug.Printf("Started senders, expect %v acknowledgements", expect) + debugf("Started senders, expect %v acknowledgements\n", expect) for i := 0; i < expect; i++ { ack, ok := <-acks if !ok { - info.Fatalf("acks channel closed after only %d acks\n", i) + exitIf(fmt.Errorf("acks channel closed after only %d acks\n", i)) } d := <-ack.ack - debug.Printf("acknowledgement[%v] %v", i, ack.info) + debugf("acknowledgement[%v] %v\n", i, ack.info) if d != messaging.Accepted { - info.Printf("Unexpected disposition %v", d) + fmt.Printf("Unexpected disposition %v\n", d) } } - info.Printf("Received all %v acknowledgements", expect) - wait.Wait() // Wait for all goroutines to finish. -} + fmt.Printf("Received all %v acknowledgements\n", expect) -// Logging -func logger(prefix string, level int, w io.Writer) *log.Logger { - if *verbose >= level { - return log.New(w, prefix, 0) + wait.Wait() // Wait for all goroutines to finish. + for _, c := range connections { // Close all connections + if c != nil { + c.Close() + } } - return log.New(ioutil.Discard, "", 0) } -var info, debug *log.Logger - -func init() { - flag.Parse() - name := path.Base(os.Args[0]) - log.SetFlags(0) // Use default logger for errors. - log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. - info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. - debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +// Simple debug logging +func debugf(format string, data ...interface{}) { + if *debug { + fmt.Fprintf(os.Stderr, format, data...) + } } // Simple error handling for demo. -func fatalIf(err error) { +func exitIf(err error) { if err != nil { - log.Fatal(err) + fmt.Fprintln(os.Stderr, err) } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/proton-c/bindings/go/genwrap.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go index 094b196..27e5966 100644 --- a/proton-c/bindings/go/genwrap.go +++ b/proton-c/bindings/go/genwrap.go @@ -35,6 +35,59 @@ import ( "text/template" ) +var includeProton = "../../include/proton" +var outpath = "src/qpid.apache.org/proton/go/event/wrappers_gen.go" + +func main() { + flag.Parse() + out, err := os.Create(outpath) + panicIf(err) + defer out.Close() + + apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection"} + fmt.Fprintln(out, copyright) + fmt.Fprint(out, ` +package event + +import ( + "time" + "unsafe" + "qpid.apache.org/proton/go/internal" +) + +// #include <proton/types.h> +// #include <proton/event.h> +// #include <stdlib.h> +`) + for _, api := range apis { + fmt.Fprintf(out, "// #include <proton/%s.h>\n", api) + } + fmt.Fprintln(out, `import "C"`) + + event(out) + + for _, api := range apis { + fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", api) + header := readHeader(api) + enums := findEnums(header) + for _, e := range enums { + genEnum(out, e.Name, e.Values) + } + apiWrapFns(api, header, out) + } + out.Close() + + // Run gofmt. + cmd := exec.Command("gofmt", "-w", outpath) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err = cmd.Run() + if err != nil { + fmt.Fprintf(os.Stderr, "gofmt: %s", err) + os.Exit(1) + } +} + func mixedCase(s string) string { result := "" for _, w := range strings.Split(s, "_") { @@ -96,7 +149,7 @@ func panicIf(err error) { } func readHeader(name string) string { - file, err := os.Open(path.Join(*includeProton, name+".h")) + file, err := os.Open(path.Join(includeProton, name+".h")) panicIf(err) defer file.Close() s, err := ioutil.ReadAll(file) @@ -372,56 +425,3 @@ func apiWrapFns(api, header string, out io.Writer) { fmt.Fprintf(out, "}\n") } } - -var includeProton = flag.String("include", "", "path to proton include files, including /proton") - -func main() { - flag.Parse() - outpath := "wrappers_gen.go" - out, err := os.Create(outpath) - panicIf(err) - defer out.Close() - - apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection"} - fmt.Fprintln(out, copyright) - fmt.Fprint(out, ` -package event - -import ( - "time" - "unsafe" - "qpid.apache.org/proton/go/internal" -) - -// #include <proton/types.h> -// #include <proton/event.h> -// #include <stdlib.h> -`) - for _, api := range apis { - fmt.Fprintf(out, "// #include <proton/%s.h>\n", api) - } - fmt.Fprintln(out, `import "C"`) - - event(out) - - for _, api := range apis { - fmt.Fprintf(out, "// Wrappers for declarations in %s.h\n\n", api) - header := readHeader(api) - enums := findEnums(header) - for _, e := range enums { - genEnum(out, e.Name, e.Values) - } - apiWrapFns(api, header, out) - } - out.Close() - - // Run gofmt. - cmd := exec.Command("gofmt", "-w", outpath) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - err = cmd.Run() - if err != nil { - fmt.Fprintf(os.Stderr, "gofmt: %s", err) - os.Exit(1) - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go index db022ff..73db513 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/event/pump.go @@ -101,7 +101,7 @@ you are doing something fairly low-level it is probably a better choice. */ type Pump struct { // Error is set on exit from Run() if there was an error. - Error error + Error error // FIXME aconway 2015-05-26: make it a function // Channel to inject functions to be executed in the Pump's proton event loop. Inject chan func() @@ -212,6 +212,9 @@ func (p *Pump) Close() error { } delete(pumps, p.connection) p.free() + if p.Error == io.EOF { + return nil + } return p.Error } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go index 01ba890..f3f3307 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/internal/error.go @@ -29,7 +29,6 @@ import ( "fmt" "runtime" "sync" - "sync/atomic" "unsafe" ) @@ -105,21 +104,23 @@ func panicIf(condition bool, fmt string, args ...interface{}) { // FirstError is a goroutine-safe error holder that keeps the first error that is set. type FirstError struct { - err atomic.Value - once sync.Once + err error + lock sync.Mutex } -// Set the error if not allread set. -func (e *FirstError) Set(err error) { - e.once.Do(func() { e.err.Store(err) }) +// Set the error if not already set, return the error. +func (e *FirstError) Set(err error) error { + e.lock.Lock() + defer e.lock.Unlock() + if e.err == nil { + e.err = err + } + return e.err } // Get the error. func (e *FirstError) Get() error { - v := e.err.Load() - if v != nil { - return v.(error) - } else { - return nil - } + e.lock.Lock() + defer e.lock.Unlock() + return e.err } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e964480f/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go ---------------------------------------------------------------------- diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go index e653de2..e4b117d 100644 --- a/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go +++ b/proton-c/bindings/go/src/qpid.apache.org/proton/go/messaging/messaging.go @@ -23,11 +23,16 @@ package messaging import "C" import ( + "io" "net" "qpid.apache.org/proton/go/amqp" "qpid.apache.org/proton/go/event" + "qpid.apache.org/proton/go/internal" ) +// Closed is an alias for io.EOF. It indicates orderly closure of an endpoint. +var Closed = io.EOF + // Connection is a connection to a remote AMQP endpoint. // // You can set exported fields to configure the connection before calling @@ -42,27 +47,27 @@ type Connection struct { handler *handler pump *event.Pump session Session + err internal.FirstError } +// Error returns nil if the connection is open, messaging.Closed if was closed cleanly +// or an error value if it was closed due to an error. +func (c *Connection) Error() error { return c.err.Get() } + // Make an AMQP connection over a net.Conn connection. -// -// Use Connection.Close() to close the Connection, this will also close conn. -// Using conn.Close() directly will cause an abrupt disconnect rather than an -// orderly AMQP close. -// +// You must call c.Close() to close the connection and clean up its resources. func (c *Connection) Open(conn net.Conn) (err error) { c.handler = newHandler(c) c.pump, err = event.NewPump(conn, event.NewMessagingDelegator(c.handler), ) - if err != nil { - return err - } - if c.Server { - c.pump.Server() + if err == nil { + if c.Server { + c.pump.Server() + } + go c.pump.Run() } - go c.pump.Run() - return nil + return c.err.Set(err) } // Connect opens a default client connection. It is a shortcut for @@ -71,14 +76,16 @@ func (c *Connection) Open(conn net.Conn) (err error) { // func Connect(conn net.Conn) (*Connection, error) { c := &Connection{} - err := c.Open(conn) - return c, err + c.err.Set(c.Open(conn)) + return c, c.Error() } -// Close the connection. -// -// Connections must be closed to clean up resources and stop associated goroutines. -func (c *Connection) Close() error { return c.pump.Close() } +// Close cleans up resources and closes the associated net.Conn connection. +func (c *Connection) Close() error { + err := c.pump.Close() // Will be nil on close OK + c.err.Set(c.pump.Error) // Will be io.EOF on close OK + return err +} // DefaultSession returns a default session for the connection. // @@ -86,6 +93,9 @@ func (c *Connection) Close() error { return c.pump.Close() } // Use Session() for more control over creating sessions. // func (c *Connection) DefaultSession() (s Session, err error) { + if c.Error() != nil { + return Session{}, c.Error() + } if c.session.e.IsNil() { c.session, err = c.Session() } @@ -237,14 +247,15 @@ func (s *Sender) Send(m amqp.Message) (ack Acknowledgement, err error) { // Close the sender. func (s *Sender) Close() error { return nil } // FIXME aconway 2015-04-27: close/free -// Receiver receives messages via the channel Receive. type Receiver struct { Link - // Channel of messag + // Channel to receive messages. When it closes, check Receiver.Error() for an error. Receive <-chan amqp.Message } // FIXME aconway 2015-04-29: settlement - ReceivedMessage with Settle() method? +// FIXME aconway 2015-05-25: Close must unblock Receive() calls. + // Close the Receiver. func (r *Receiver) Close() error { return nil } // FIXME aconway 2015-04-29: close/free --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
