PROTON-1390: Get rid of relative ../util.go import, simplify examples. gccgo does not support relative imports, simplify the examples to remove the need for a common library.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0f156d72 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0f156d72 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0f156d72 Branch: refs/heads/master Commit: 0f156d721f56d265d1d36814f1334fd959ab0d09 Parents: 9fc393a Author: Alan Conway <[email protected]> Authored: Tue Jan 17 19:12:55 2017 -0500 Committer: Alan Conway <[email protected]> Committed: Tue Jan 17 19:42:54 2017 -0500 ---------------------------------------------------------------------- examples/go/electron/broker.go | 70 ++++++++++++++++++++++++++++-------- examples/go/electron/receive.go | 56 +++++++++++++++-------------- examples/go/electron/send.go | 51 +++++++++++++------------- examples/go/proton/broker.go | 69 +++++++++++++++++++++++++++-------- examples/go/util/queue.go | 61 ------------------------------- examples/go/util/util.go | 68 ----------------------------------- 6 files changed, 167 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go index 2078d1e..9228195 100644 --- a/examples/go/electron/broker.go +++ b/examples/go/electron/broker.go @@ -27,7 +27,6 @@ under the License. package main import ( - "../util" "flag" "fmt" "log" @@ -35,6 +34,7 @@ import ( "os" "qpid.apache.org/amqp" "qpid.apache.org/electron" + "sync" ) // Usage and command-line flags @@ -49,12 +49,19 @@ A simple broker-like demo. Queues are created automatically for sender or receiv var addr = flag.String("addr", ":amqp", "Listening address") var credit = flag.Int("credit", 100, "Receiver credit window") var qsize = flag.Int("qsize", 1000, "Max queue size") +var debug = flag.Bool("debug", false, "Print detailed debug output") +var debugf = func(format string, data ...interface{}) {} // Default no debugging output func main() { flag.Usage = usage flag.Parse() + + if *debug { + debugf = func(format string, data ...interface{}) { log.Printf(format, data...) } + } + b := &broker{ - queues: util.MakeQueues(*qsize), + queues: makeQueues(*qsize), container: electron.NewContainer(fmt.Sprintf("broker[%v]", os.Getpid())), acks: make(chan electron.Outcome), sent: make(chan sentMessage), @@ -66,7 +73,7 @@ func main() { // State for the broker type broker struct { - queues util.Queues // A collection of queues. + queues queues // A collection of queues. container electron.Container // electron.Container manages AMQP connections. sent chan sentMessage // Channel to record sent messages. acks chan electron.Outcome // Channel to receive the Outcome of sent messages. @@ -76,7 +83,7 @@ type broker struct { // If a message is rejected or not acknowledged due to a failure, we will put it back on the queue. type sentMessage struct { m amqp.Message - q util.Queue + q queue } // run listens for incoming net.Conn connections and starts an electron.Connection for each one. @@ -94,12 +101,12 @@ func (b *broker) run() error { for { c, err := b.container.Accept(listener) if err != nil { - util.Debugf("Accept error: %v", err) + debugf("Accept error: %v", err) continue } cc := &connection{b, c} go cc.run() // Handle the connection - util.Debugf("Accepted %v", c) + debugf("Accepted %v", c) } } @@ -113,7 +120,7 @@ type connection struct { // and start goroutines to service them. func (c *connection) run() { for in := range c.connection.Incoming() { - util.Debugf("incoming %v", in) + debugf("incoming %v", in) switch in := in.(type) { @@ -131,7 +138,7 @@ func (c *connection) run() { in.Accept() // Accept sessions unconditionally } } - util.Debugf("incoming closed: %v", c.connection) + debugf("incoming closed: %v", c.connection) } // receiver receives messages and pushes to a queue. @@ -139,11 +146,11 @@ func (c *connection) receiver(receiver electron.Receiver) { q := c.broker.queues.Get(receiver.Target()) for { if rm, err := receiver.Receive(); err == nil { - util.Debugf("%v: received %v", receiver, util.FormatMessage(rm.Message)) + debugf("%v: received %v %#v", receiver, rm.Message) q <- rm.Message rm.Accept() } else { - util.Debugf("%v error: %v", receiver, err) + debugf("%v error: %v", receiver, err) break } } @@ -154,13 +161,13 @@ func (c *connection) sender(sender electron.Sender) { q := c.broker.queues.Get(sender.Source()) for { if sender.Error() != nil { - util.Debugf("%v closed: %v", sender, sender.Error()) + debugf("%v closed: %v", sender, sender.Error()) return } select { case m := <-q: - util.Debugf("%v: sent %v", sender, util.FormatMessage(m)) + debugf("%v: sent %#v", sender, m) sm := sentMessage{m, q} c.broker.sent <- sm // Record sent message sender.SendAsync(m, c.broker.acks, sm) // Receive outcome on c.broker.acks with Value sm @@ -191,9 +198,44 @@ func (b *broker) acknowledgements() { delete(sentMap, sm) if outcome.Status != electron.Accepted { // Error, release or rejection sm.q.PutBack(sm.m) // Put the message back on the queue. - util.Debugf("message %v put back, status %v, error %v", - util.FormatMessage(sm.m), outcome.Status, outcome.Error) + debugf("message %#v put back, status %v, error %v", sm.m, outcome.Status, outcome.Error) } } } } + +// Use a buffered channel as a very simple queue. +type queue chan amqp.Message + +// Put a message back on the queue, does not block. +func (q queue) PutBack(m amqp.Message) { + select { + case q <- m: + default: + // Not an efficient implementation but ensures we don't block the caller. + go func() { q <- m }() + } +} + +// Concurrent-safe map of queues. +type queues struct { + queueSize int + m map[string]queue + lock sync.Mutex +} + +func makeQueues(queueSize int) queues { + return queues{queueSize: queueSize, m: make(map[string]queue)} +} + +// Create a queue if not found. +func (qs *queues) Get(name string) queue { + qs.lock.Lock() + defer qs.lock.Unlock() + q := qs.m[name] + if q == nil { + q = make(queue, qs.queueSize) + qs.m[name] = q + } + return q +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/receive.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go index 3bbe327..161e911 100644 --- a/examples/go/electron/receive.go +++ b/examples/go/electron/receive.go @@ -20,7 +20,6 @@ under the License. package main import ( - "../util" "flag" "fmt" "log" @@ -39,11 +38,16 @@ Receive messages from all the listed URLs concurrently and print them. } var count = flag.Uint64("count", 1, "Stop after receiving this many messages.") +var debug = flag.Bool("debug", false, "Print detailed debug output") +var debugf = func(format string, data ...interface{}) {} // Default no debugging output func main() { flag.Usage = usage flag.Parse() + if *debug { + debugf = func(format string, data ...interface{}) { log.Printf(format, data...) } + } urls := flag.Args() // Non-flag arguments are URLs to receive from if len(urls) == 0 { log.Println("No URL provided") @@ -63,33 +67,31 @@ func main() { // Start a goroutine to for each URL to receive messages and send them to the messages channel. // main() receives and prints them. for _, urlStr := range urls { - util.Debugf("Connecting to %s\n", urlStr) + debugf("Connecting to %s\n", urlStr) go func(urlStr string) { // Start the goroutine - - defer wait.Done() // Notify main() when this goroutine is done. - url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - util.ExitIf(err) - - // Open a new connection - c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - util.ExitIf(err) - connections <- c // Save connection so we can Close() when main() ends - - // Create a Receiver using the path of the URL as the source address - r, err := c.Receiver(electron.Source(url.Path)) - util.ExitIf(err) - - // Loop receiving messages and sending them to the main() goroutine - for { - if rm, err := r.Receive(); err == nil { - rm.Accept() - messages <- rm.Message - } else if err == electron.Closed { - return - } else { - log.Fatalf("receive error %v: %v", urlStr, err) + defer wait.Done() // Notify main() when this goroutine is done. + var err error + if url, err := amqp.ParseURL(urlStr); err == nil { + if c, err := container.Dial("tcp", url.Host); err == nil { + connections <- c // Save connection so we can Close() when main() ends + if r, err := c.Receiver(electron.Source(url.Path)); err == nil { + // Loop receiving messages and sending them to the main() goroutine + for { + if rm, err := r.Receive(); err == nil { + rm.Accept() + messages <- rm.Message + } else if err == electron.Closed { + return + } else { + log.Fatal("receive error %v: %v", urlStr, err) + } + } + } } } + if err != nil { + log.Fatal(err) + } }(urlStr) } @@ -99,7 +101,7 @@ func main() { // print each message until the count is exceeded. for i := uint64(0); i < *count; i++ { m := <-messages - util.Debugf("%s\n", util.FormatMessage(m)) + debugf("%#v\n", m) } fmt.Printf("Received %d messages\n", *count) @@ -107,7 +109,7 @@ func main() { // with electron.Closed. for i := 0; i < len(urls); i++ { c := <-connections - util.Debugf("close %s", c) + debugf("close %s", c) c.Close(nil) } wait.Wait() // Wait for all goroutines to finish. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/send.go ---------------------------------------------------------------------- diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go index 68b8b2e..9ab5f1c 100644 --- a/examples/go/electron/send.go +++ b/examples/go/electron/send.go @@ -20,7 +20,6 @@ under the License. package main import ( - "../util" "flag" "fmt" "log" @@ -39,11 +38,17 @@ Send messages to each URL concurrently with body "<url-path>-<n>" where n is the } var count = flag.Int64("count", 1, "Send this may messages per address.") +var debug = flag.Bool("debug", false, "Print detailed debug output") +var Debugf = func(format string, data ...interface{}) {} // Default no debugging output func main() { flag.Usage = usage flag.Parse() + if *debug { + Debugf = func(format string, data ...interface{}) { log.Printf(format, data...) } + } + urls := flag.Args() // Non-flag arguments are URLs to receive from if len(urls) == 0 { log.Println("No URL provided") @@ -61,41 +66,39 @@ func main() { // Start a goroutine for each URL to send messages. for _, urlStr := range urls { - util.Debugf("Connecting to %v\n", urlStr) + Debugf("Connecting to %v\n", urlStr) go func(urlStr string) { - - defer wait.Done() // Notify main() that this goroutine is done. - url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults. - util.ExitIf(err) - - // Open a new connection - c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port" - util.ExitIf(err) - connections <- c // Save connection so we can Close() when main() ends - - // Create a Sender using the path of the URL as the AMQP address - s, err := c.Sender(electron.Target(url.Path)) - util.ExitIf(err) - - // Loop sending messages. - for i := int64(0); i < *count; i++ { - m := amqp.NewMessage() - body := fmt.Sprintf("%v-%v", url.Path, i) - m.Marshal(body) - s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan + defer wait.Done() // Notify main() when this goroutine is done. + var err error + if url, err := amqp.ParseURL(urlStr); err == nil { + if c, err := container.Dial("tcp", url.Host); err == nil { + connections <- c // Save connection so we can Close() when main() ends + if s, err := c.Sender(electron.Target(url.Path)); err == nil { + // Loop sending messages. + for i := int64(0); i < *count; i++ { + m := amqp.NewMessage() + body := fmt.Sprintf("%v-%v", url.Path, i) + m.Marshal(body) + s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan + } + } + } + } + if err != nil { + log.Fatal(err) } }(urlStr) } // Wait for all the acknowledgements expect := int(*count) * len(urls) - util.Debugf("Started senders, expect %v acknowledgements\n", expect) + Debugf("Started senders, expect %v acknowledgements\n", expect) for i := 0; i < expect; i++ { out := <-sentChan // Outcome of async sends. if out.Error != nil { log.Fatalf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error) } else { - util.Debugf("acknowledgement[%v] %v (%v)\n", i, out.Value, out.Status) + Debugf("acknowledgement[%v] %v (%v)\n", i, out.Value, out.Status) } } fmt.Printf("Received all %v acknowledgements\n", expect) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/proton/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go index 8f0efda..8291ca9 100644 --- a/examples/go/proton/broker.go +++ b/examples/go/proton/broker.go @@ -30,7 +30,6 @@ under the License. package main import ( - "../util" "flag" "fmt" "log" @@ -38,6 +37,7 @@ import ( "os" "qpid.apache.org/amqp" "qpid.apache.org/proton" + "sync" ) // Usage and command-line flags @@ -52,11 +52,16 @@ A simple broker-like demo. Queues are created automatically for sender or receiv var addr = flag.String("addr", ":amqp", "Listening address") var credit = flag.Int("credit", 100, "Receiver credit window") var qsize = flag.Int("qsize", 1000, "Max queue size") +var debug = flag.Bool("debug", false, "Print detailed debug output") +var debugf = func(format string, data ...interface{}) {} // Default no debugging output func main() { flag.Usage = usage flag.Parse() - b := &broker{util.MakeQueues(*qsize)} + if *debug { + debugf = func(format string, data ...interface{}) { log.Printf(format, data...) } + } + b := &broker{makeQueues(*qsize)} if err := b.run(); err != nil { log.Fatal(err) } @@ -64,7 +69,7 @@ func main() { // State for the broker type broker struct { - queues util.Queues + queues queues } // Listens for connections and starts a proton.Engine for each one. @@ -78,7 +83,7 @@ func (b *broker) run() error { for { conn, err := listener.Accept() if err != nil { - util.Debugf("Accept error: %v", err) + debugf("Accept error: %v", err) continue } adapter := proton.NewMessagingAdapter(newHandler(&b.queues)) @@ -88,14 +93,14 @@ func (b *broker) run() error { adapter.AutoAccept = false engine, err := proton.NewEngine(conn, adapter) if err != nil { - util.Debugf("Connection error: %v", err) + debugf("Connection error: %v", err) continue } engine.Server() // Enable server-side protocol negotiation. - util.Debugf("Accepted connection %s", engine) + debugf("Accepted connection %s", engine) go func() { // Start goroutine to run the engine event loop engine.Run() - util.Debugf("Closed %s", engine) + debugf("Closed %s", engine) }() } } @@ -105,13 +110,13 @@ func (b *broker) run() error { // all calls to the handler. We use channels to communicate between the handler // goroutine and other goroutines sending and receiving messages. type handler struct { - queues *util.Queues + queues *queues receivers map[proton.Link]*receiver senders map[proton.Link]*sender injecter proton.Injecter } -func newHandler(queues *util.Queues) *handler { +func newHandler(queues *queues) *handler { return &handler{ queues: queues, receivers: make(map[proton.Link]*receiver), @@ -156,7 +161,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) } // This will not block as AMQP credit is set to the buffer capacity. r.buffer <- receivedMessage{e.Delivery(), m} - util.Debugf("link %s received %s", e.Link(), util.FormatMessage(m)) + debugf("link %s received %#v", e.Link(), m) case proton.MConnectionClosed, proton.MDisconnected: for l, _ := range h.receivers { @@ -187,11 +192,11 @@ func (h *handler) linkClosed(l proton.Link, err error) { // channels. type link struct { l proton.Link - q util.Queue + q queue h *handler } -func makeLink(l proton.Link, q util.Queue, h *handler) link { +func makeLink(l proton.Link, q queue, h *handler) link { lnk := link{l: l, q: q, h: h} return lnk } @@ -280,7 +285,7 @@ func (s *sender) sendable() { // run runs in a separate goroutine. It monitors the queue for messages and injects // a function to send them when there is credit func (s *sender) run() { - var q util.Queue // q is nil initially as we have no credit. + var q queue // q is nil initially as we have no credit. for { select { case _, ok := <-s.credit: @@ -323,9 +328,45 @@ func (s *sender) sendOne(m amqp.Message) error { delivery, err := s.l.Send(m) if err == nil { delivery.Settle() // Pre-settled, unreliable. - util.Debugf("link %s sent %s", s.l, util.FormatMessage(m)) + debugf("link %s sent %#v", s.l, m) } else { s.q.PutBack(m) // Put the message back on the queue, don't block } return err } + +// Use a buffered channel as a very simple queue. +type queue chan amqp.Message + +// Put a message back on the queue, does not block. +func (q queue) PutBack(m amqp.Message) { + select { + case q <- m: + default: + // Not an efficient implementation but ensures we don't block the caller. + go func() { q <- m }() + } +} + +// Concurrent-safe map of queues. +type queues struct { + queueSize int + m map[string]queue + lock sync.Mutex +} + +func makeQueues(queueSize int) queues { + return queues{queueSize: queueSize, m: make(map[string]queue)} +} + +// Create a queue if not found. +func (qs *queues) Get(name string) queue { + qs.lock.Lock() + defer qs.lock.Unlock() + q := qs.m[name] + if q == nil { + q = make(queue, qs.queueSize) + qs.m[name] = q + } + return q +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/util/queue.go ---------------------------------------------------------------------- diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go deleted file mode 100644 index 2eaba72..0000000 --- a/examples/go/util/queue.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package util - -import ( - "qpid.apache.org/amqp" - "sync" -) - -// Use a buffered channel as a very simple queue. -type Queue chan amqp.Message - -// Put a message back on the queue, does not block. -func (q Queue) PutBack(m amqp.Message) { - select { - case q <- m: - default: - // Not an efficient implementation but ensures we don't block the caller. - go func() { q <- m }() - } -} - -// Concurrent-safe map of queues. -type Queues struct { - queueSize int - m map[string]Queue - lock sync.Mutex -} - -func MakeQueues(queueSize int) Queues { - return Queues{queueSize: queueSize, m: make(map[string]Queue)} -} - -// Create a queue if not found. -func (qs *Queues) Get(name string) Queue { - qs.lock.Lock() - defer qs.lock.Unlock() - q := qs.m[name] - if q == nil { - q = make(Queue, qs.queueSize) - qs.m[name] = q - } - return q -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/util/util.go ---------------------------------------------------------------------- diff --git a/examples/go/util/util.go b/examples/go/util/util.go deleted file mode 100644 index 20f2192..0000000 --- a/examples/go/util/util.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// util contains utility types and functions to simplify parts of the example -// code that are not related to the use of proton. -package util - -import ( - "flag" - "fmt" - "log" - "os" - "path" - "qpid.apache.org/amqp" -) - -// Debug flag "-debug" enables debug output with Debugf -var Debug = flag.Bool("debug", false, "Print detailed debug output") - -// Full flag "-full" enables full message output by FormatMessage -var Full = flag.Bool("full", false, "Print full message not just body.") - -// Debugf logs debug messages if "-debug" flag is set. -func Debugf(format string, data ...interface{}) { - if *Debug { - log.Printf(format, data...) - } -} - -// Simple error handling for demo. -func ExitIf(err error) { - if err != nil { - log.Fatal(err) - } -} - -// FormatMessage formats a message as a string, just the body by default or -// the full message (with properties etc.) if "-full" flag is set. -func FormatMessage(m amqp.Message) string { - if *Full { - return fmt.Sprintf("%#v", m) - } else { - return fmt.Sprintf("%#v", m.Body()) - } -} - -// For example programs, use the program name as the log prefix. -func init() { - log.SetFlags(0) - _, prog := path.Split(os.Args[0]) - log.SetPrefix(fmt.Sprintf("%s(%d): ", prog, os.Getpid())) -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
