PROTON-827: go binding: minor example cleanup.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/333a2b28 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/333a2b28 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/333a2b28 Branch: refs/heads/proton-go Commit: 333a2b28d3408e5bfe40eb0a7f412c4b03b9a684 Parents: fa784b1 Author: Alan Conway <[email protected]> Authored: Mon May 11 17:34:13 2015 -0400 Committer: Alan Conway <[email protected]> Committed: Mon Sep 28 14:08:22 2015 -0400 ---------------------------------------------------------------------- examples/go/receive.go | 174 ++++++++++++++++++++++++++++++++++++++++++++ examples/go/send.go | 156 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 330 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/333a2b28/examples/go/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/receive.go b/examples/go/receive.go new file mode 100644 index 0000000..2545eab --- /dev/null +++ b/examples/go/receive.go @@ -0,0 +1,174 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package main + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math" + "net" + "os" + "path" + "qpid.apache.org/proton" + "qpid.apache.org/proton/messaging" + "sync" + "time" +) + +// Command-line flags +var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") +var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 means unlimited.") +var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means unlimited.") +var full = flag.Bool("full", false, "Print full message not just body.") + +func main() { + // Parse flags and arguments, print usage message on error. + flag.Usage = func() { + fmt.Fprintf(os.Stderr, ` +Usage: %s url [url ...] +Receive messages from all the listed URLs concurrently and print them. +`, os.Args[0]) + flag.PrintDefaults() + } + flag.Parse() + urls := flag.Args() // Non-flag arguments are URLs to receive from + if len(urls) == 0 { + flag.Usage() + fmt.Fprintf(os.Stderr, "No URL provided") + os.Exit(1) + } + duration := time.Duration(*timeout) * time.Second + if duration == 0 { + duration = time.Duration(math.MaxInt64) // Not forever, but 290 years is close enough. + } + if *count == 0 { + *count = math.MaxInt64 + } + + // Create a goroutine for each URL that receives messages and sends them to + // the messages channel. main() receives and prints them. + + messages := make(chan proton.Message) // Channel for messages from goroutines to main() + stop := make(chan struct{}) // Closing this channel means the program is stopping. + + var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. + + wait.Add(len(urls)) // Wait for one goroutine per URL. + + // Arrange to close all connections on exit + connections := make([]*messaging.Connection, len(urls)) + defer func() { + for _, c := range connections { + if c != nil { + c.Close() + } + } + }() + + for i, urlStr := range urls { + debug.Printf("Connecting to %s", urlStr) + go func(urlStr string) { + defer wait.Done() // Notify main() that this goroutine is done. + url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. + fatalIf(err) + + // Open a standard Go net.Conn and and AMQP connection using it. + conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" + fatalIf(err) + pc, err := messaging.Connect(conn) // This is our AMQP connection. + fatalIf(err) + connections[i] = pc // So we can close it when main() ends + + // Create a receiver using the path of the URL as the AMQP address + r, err := pc.Receiver(url.Path) + fatalIf(err) + + for { + var m proton.Message + select { // Receive a message or stop. + case m = <-r.Receive: + case <-stop: // The program is stopping. + return + } + select { // Send m to main() or stop + case messages <- m: // Send m to main() + case <-stop: // The program is stopping. + return + } + } + }(urlStr) + } + info.Printf("Listening") + + // time.After() returns a channel that will close when the timeout is up. + timer := time.After(duration) + + // main() prints each message and checks for count or timeout being exceeded. + for i := int64(0); i < *count; i++ { + select { + case m := <-messages: + debug.Print(formatMessage{m}) + case <-timer: // Timeout has expired + i = 0 + } + } + info.Printf("Received %d messages", *count) + close(stop) // Signal all goroutines to stop. + wait.Wait() // Wait for all goroutines to finish. +} + +// Logging +func logger(prefix string, level int, w io.Writer) *log.Logger { + if *verbose >= level { + return log.New(w, prefix, 0) + } + return log.New(ioutil.Discard, "", 0) +} + +var info, debug *log.Logger + +func init() { + flag.Parse() + name := path.Base(os.Args[0]) + log.SetFlags(0) // Use default logger for errors. + log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. + info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. + debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +} + +// Simple error handling for demo. +func fatalIf(err error) { + if err != nil { + log.Fatal(err) + } +} + +type formatMessage struct{ m proton.Message } + +func (fm formatMessage) String() string { + if *full { + return fmt.Sprintf("%#v", fm.m) + } else { + return fmt.Sprintf("%#v", fm.m.Body()) + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/333a2b28/examples/go/send.go ---------------------------------------------------------------------- diff --git a/examples/go/send.go b/examples/go/send.go new file mode 100644 index 0000000..c4db7cd --- /dev/null +++ b/examples/go/send.go @@ -0,0 +1,156 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package main + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "log" + "math" + "net" + "os" + "path" + "qpid.apache.org/proton" + "qpid.apache.org/proton/messaging" + "sync" +) + +// Command-line flags +var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means more") +var count = flag.Int64("count", 1, "Send this may messages per address. 0 means unlimited.") + +// Ack associates an info string with an acknowledgement +type Ack struct { + ack messaging.Acknowledgement + info string +} + +func main() { + // Parse flags and arguments, print usage message on error. + flag.Usage = func() { + fmt.Fprintf(os.Stderr, ` +Usage: %s url [url ...] +Send messages to all the listed URLs concurrently. +To each URL, send the string "path-n" where n is the message number. +`, os.Args[0]) + flag.PrintDefaults() + } + flag.Parse() + urls := flag.Args() // Non-flag arguments are URLs to receive from + if len(urls) == 0 { + flag.Usage() + fmt.Fprintf(os.Stderr, "No URL provided\n") + os.Exit(1) + } + if *count == 0 { + *count = math.MaxInt64 + } + + // Create a channel to receive all the acknowledgements + acks := make(chan Ack) + + // Create a goroutine for each URL that sends messages. + var wait sync.WaitGroup // Used by main() to wait for all goroutines to end. + wait.Add(len(urls)) // Wait for one goroutine per URL. + + // Arrange to close all connections on exit + connections := make([]*messaging.Connection, len(urls)) + defer func() { + for _, c := range connections { + if c != nil { + c.Close() + } + } + }() + + for i, urlStr := range urls { + debug.Printf("Connecting to %v", urlStr) + go func(urlStr string) { + defer wait.Done() // Notify main() that this goroutine is done. + url, err := proton.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. + fatalIf(err) + + // Open a standard Go net.Conn and and AMQP connection using it. + conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" + fatalIf(err) + pc, err := messaging.Connect(conn) // This is our AMQP connection. + fatalIf(err) + connections[i] = pc // So we can close it when main() ends + + // Create a sender using the path of the URL as the AMQP address + s, err := pc.Sender(url.Path) + fatalIf(err) + + for i := int64(0); i < *count; i++ { + m := proton.NewMessage() + body := fmt.Sprintf("%v-%v", url.Path, i) + m.SetBody(body) + ack, err := s.Send(m) + fatalIf(err) + acks <- Ack{ack, body} // Send the acknowledgement to main() + } + }(urlStr) + } + + // Wait for all the acknowledgements + expect := int(*count) * len(urls) + debug.Printf("Started senders, expect %v acknowledgements", expect) + for i := 0; i < expect; i++ { + ack, ok := <-acks + if !ok { + info.Fatalf("acks channel closed after only %d acks\n", i) + } + d := <-ack.ack + debug.Printf("acknowledgement[%v] %v", i, ack.info) + if d != messaging.Accepted { + info.Printf("Unexpected disposition %v", d) + } + } + info.Printf("Received all %v acknowledgements", expect) + wait.Wait() // Wait for all goroutines to finish. +} + +// Logging +func logger(prefix string, level int, w io.Writer) *log.Logger { + if *verbose >= level { + return log.New(w, prefix, 0) + } + return log.New(ioutil.Discard, "", 0) +} + +var info, debug *log.Logger + +func init() { + flag.Parse() + name := path.Base(os.Args[0]) + log.SetFlags(0) // Use default logger for errors. + log.SetPrefix(fmt.Sprintf("%s: ", name)) // Log errors on stderr. + info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout) // Log info on stdout. + debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log debug on stderr. +} + +// Simple error handling for demo. +func fatalIf(err error) { + if err != nil { + log.Fatal(err) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
