PROTON-827: go binding: minor example cleanup.

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/333a2b28
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/333a2b28
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/333a2b28

Branch: refs/heads/proton-go
Commit: 333a2b28d3408e5bfe40eb0a7f412c4b03b9a684
Parents: fa784b1
Author: Alan Conway <[email protected]>
Authored: Mon May 11 17:34:13 2015 -0400
Committer: Alan Conway <[email protected]>
Committed: Mon Sep 28 14:08:22 2015 -0400

----------------------------------------------------------------------
 examples/go/receive.go | 174 ++++++++++++++++++++++++++++++++++++++++++++
 examples/go/send.go    | 156 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 330 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/333a2b28/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
new file mode 100644
index 0000000..2545eab
--- /dev/null
+++ b/examples/go/receive.go
@@ -0,0 +1,174 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+       "flag"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "math"
+       "net"
+       "os"
+       "path"
+       "qpid.apache.org/proton"
+       "qpid.apache.org/proton/messaging"
+       "sync"
+       "time"
+)
+
+// Command-line flags
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means 
more")
+var count = flag.Int64("count", 0, "Stop after receiving this many messages. 0 
means unlimited.")
+var timeout = flag.Int64("time", 0, "Stop after this many seconds. 0 means 
unlimited.")
+var full = flag.Bool("full", false, "Print full message not just body.")
+
+func main() {
+       // Parse flags and arguments, print usage message on error.
+       flag.Usage = func() {
+               fmt.Fprintf(os.Stderr, `
+Usage: %s url [url ...]
+Receive messages from all the listed URLs concurrently and print them.
+`, os.Args[0])
+               flag.PrintDefaults()
+       }
+       flag.Parse()
+       urls := flag.Args() // Non-flag arguments are URLs to receive from
+       if len(urls) == 0 {
+               flag.Usage()
+               fmt.Fprintf(os.Stderr, "No URL provided")
+               os.Exit(1)
+       }
+       duration := time.Duration(*timeout) * time.Second
+       if duration == 0 {
+               duration = time.Duration(math.MaxInt64) // Not forever, but 290 
years is close enough.
+       }
+       if *count == 0 {
+               *count = math.MaxInt64
+       }
+
+       // Create a goroutine for each URL that receives messages and sends 
them to
+       // the messages channel. main() receives and prints them.
+
+       messages := make(chan proton.Message) // Channel for messages from 
goroutines to main()
+       stop := make(chan struct{})           // Closing this channel means the 
program is stopping.
+
+       var wait sync.WaitGroup // Used by main() to wait for all goroutines to 
end.
+
+       wait.Add(len(urls)) // Wait for one goroutine per URL.
+
+       // Arrange to close all connections on exit
+       connections := make([]*messaging.Connection, len(urls))
+       defer func() {
+               for _, c := range connections {
+                       if c != nil {
+                               c.Close()
+                       }
+               }
+       }()
+
+       for i, urlStr := range urls {
+               debug.Printf("Connecting to %s", urlStr)
+               go func(urlStr string) {
+                       defer wait.Done()                   // Notify main() 
that this goroutine is done.
+                       url, err := proton.ParseURL(urlStr) // Like 
net/url.Parse() but with AMQP defaults.
+                       fatalIf(err)
+
+                       // Open a standard Go net.Conn and and AMQP connection 
using it.
+                       conn, err := net.Dial("tcp", url.Host) // Note 
net.URL.Host is actually "host:port"
+                       fatalIf(err)
+                       pc, err := messaging.Connect(conn) // This is our AMQP 
connection.
+                       fatalIf(err)
+                       connections[i] = pc // So we can close it when main() 
ends
+
+                       // Create a receiver using the path of the URL as the 
AMQP address
+                       r, err := pc.Receiver(url.Path)
+                       fatalIf(err)
+
+                       for {
+                               var m proton.Message
+                               select { // Receive a message or stop.
+                               case m = <-r.Receive:
+                               case <-stop: // The program is stopping.
+                                       return
+                               }
+                               select { // Send m to main() or stop
+                               case messages <- m: // Send m to main()
+                               case <-stop: // The program is stopping.
+                                       return
+                               }
+                       }
+               }(urlStr)
+       }
+       info.Printf("Listening")
+
+       // time.After() returns a channel that will close when the timeout is 
up.
+       timer := time.After(duration)
+
+       // main() prints each message and checks for count or timeout being 
exceeded.
+       for i := int64(0); i < *count; i++ {
+               select {
+               case m := <-messages:
+                       debug.Print(formatMessage{m})
+               case <-timer: // Timeout has expired
+                       i = 0
+               }
+       }
+       info.Printf("Received %d messages", *count)
+       close(stop) // Signal all goroutines to stop.
+       wait.Wait() // Wait for all goroutines to finish.
+}
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+       if *verbose >= level {
+               return log.New(w, prefix, 0)
+       }
+       return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+       flag.Parse()
+       name := path.Base(os.Args[0])
+       log.SetFlags(0)                                               // Use 
default logger for errors.
+       log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log 
errors on stderr.
+       info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log 
info on stdout.
+       debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log 
debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+       if err != nil {
+               log.Fatal(err)
+       }
+}
+
+type formatMessage struct{ m proton.Message }
+
+func (fm formatMessage) String() string {
+       if *full {
+               return fmt.Sprintf("%#v", fm.m)
+       } else {
+               return fmt.Sprintf("%#v", fm.m.Body())
+       }
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/333a2b28/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
new file mode 100644
index 0000000..c4db7cd
--- /dev/null
+++ b/examples/go/send.go
@@ -0,0 +1,156 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+       "flag"
+       "fmt"
+       "io"
+       "io/ioutil"
+       "log"
+       "math"
+       "net"
+       "os"
+       "path"
+       "qpid.apache.org/proton"
+       "qpid.apache.org/proton/messaging"
+       "sync"
+)
+
+// Command-line flags
+var verbose = flag.Int("verbose", 1, "Output level, 0 means none, higher means 
more")
+var count = flag.Int64("count", 1, "Send this may messages per address. 0 
means unlimited.")
+
+// Ack associates an info string with an acknowledgement
+type Ack struct {
+       ack  messaging.Acknowledgement
+       info string
+}
+
+func main() {
+       // Parse flags and arguments, print usage message on error.
+       flag.Usage = func() {
+               fmt.Fprintf(os.Stderr, `
+Usage: %s url [url ...]
+Send messages to all the listed URLs concurrently.
+To each URL, send the string "path-n" where n is the message number.
+`, os.Args[0])
+               flag.PrintDefaults()
+       }
+       flag.Parse()
+       urls := flag.Args() // Non-flag arguments are URLs to receive from
+       if len(urls) == 0 {
+               flag.Usage()
+               fmt.Fprintf(os.Stderr, "No URL provided\n")
+               os.Exit(1)
+       }
+       if *count == 0 {
+               *count = math.MaxInt64
+       }
+
+       // Create a channel to receive all the acknowledgements
+       acks := make(chan Ack)
+
+       // Create a goroutine for each URL that sends messages.
+       var wait sync.WaitGroup // Used by main() to wait for all goroutines to 
end.
+       wait.Add(len(urls))     // Wait for one goroutine per URL.
+
+       // Arrange to close all connections on exit
+       connections := make([]*messaging.Connection, len(urls))
+       defer func() {
+               for _, c := range connections {
+                       if c != nil {
+                               c.Close()
+                       }
+               }
+       }()
+
+       for i, urlStr := range urls {
+               debug.Printf("Connecting to %v", urlStr)
+               go func(urlStr string) {
+                       defer wait.Done()                   // Notify main() 
that this goroutine is done.
+                       url, err := proton.ParseURL(urlStr) // Like 
net/url.Parse() but with AMQP defaults.
+                       fatalIf(err)
+
+                       // Open a standard Go net.Conn and and AMQP connection 
using it.
+                       conn, err := net.Dial("tcp", url.Host) // Note 
net.URL.Host is actually "host:port"
+                       fatalIf(err)
+                       pc, err := messaging.Connect(conn) // This is our AMQP 
connection.
+                       fatalIf(err)
+                       connections[i] = pc // So we can close it when main() 
ends
+
+                       // Create a sender using the path of the URL as the 
AMQP address
+                       s, err := pc.Sender(url.Path)
+                       fatalIf(err)
+
+                       for i := int64(0); i < *count; i++ {
+                               m := proton.NewMessage()
+                               body := fmt.Sprintf("%v-%v", url.Path, i)
+                               m.SetBody(body)
+                               ack, err := s.Send(m)
+                               fatalIf(err)
+                               acks <- Ack{ack, body} // Send the 
acknowledgement to main()
+                       }
+               }(urlStr)
+       }
+
+       // Wait for all the acknowledgements
+       expect := int(*count) * len(urls)
+       debug.Printf("Started senders, expect %v acknowledgements", expect)
+       for i := 0; i < expect; i++ {
+               ack, ok := <-acks
+               if !ok {
+                       info.Fatalf("acks channel closed after only %d acks\n", 
i)
+               }
+               d := <-ack.ack
+               debug.Printf("acknowledgement[%v] %v", i, ack.info)
+               if d != messaging.Accepted {
+                       info.Printf("Unexpected disposition %v", d)
+               }
+       }
+       info.Printf("Received all %v acknowledgements", expect)
+       wait.Wait() // Wait for all goroutines to finish.
+}
+
+// Logging
+func logger(prefix string, level int, w io.Writer) *log.Logger {
+       if *verbose >= level {
+               return log.New(w, prefix, 0)
+       }
+       return log.New(ioutil.Discard, "", 0)
+}
+
+var info, debug *log.Logger
+
+func init() {
+       flag.Parse()
+       name := path.Base(os.Args[0])
+       log.SetFlags(0)                                               // Use 
default logger for errors.
+       log.SetPrefix(fmt.Sprintf("%s: ", name))                      // Log 
errors on stderr.
+       info = logger(fmt.Sprintf("%s: ", name), 1, os.Stdout)        // Log 
info on stdout.
+       debug = logger(fmt.Sprintf("%s debug: ", name), 2, os.Stderr) // Log 
debug on stderr.
+}
+
+// Simple error handling for demo.
+func fatalIf(err error) {
+       if err != nil {
+               log.Fatal(err)
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to