http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/go/proton/broker.go ---------------------------------------------------------------------- diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go deleted file mode 100644 index 8f0efda..0000000 --- a/examples/go/proton/broker.go +++ /dev/null @@ -1,331 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// -// This is a simple AMQP broker implemented using the event-driven proton package. -// -// It maintains a set of named in-memory queues of messages. Clients can send -// messages to queues or subscribe to receive messages from them. -// - -// TODO: show how to handle acknowledgedments from receivers and put rejected or -// un-acknowledged messages back on their queues. - -package main - -import ( - "../util" - "flag" - "fmt" - "log" - "net" - "os" - "qpid.apache.org/amqp" - "qpid.apache.org/proton" -) - -// Usage and command-line flags -func usage() { - fmt.Fprintf(os.Stderr, ` -Usage: %s -A simple broker-like demo. Queues are created automatically for sender or receiver addrsses. -`, os.Args[0]) - flag.PrintDefaults() -} - -var addr = flag.String("addr", ":amqp", "Listening address") -var credit = flag.Int("credit", 100, "Receiver credit window") -var qsize = flag.Int("qsize", 1000, "Max queue size") - -func main() { - flag.Usage = usage - flag.Parse() - b := &broker{util.MakeQueues(*qsize)} - if err := b.run(); err != nil { - log.Fatal(err) - } -} - -// State for the broker -type broker struct { - queues util.Queues -} - -// Listens for connections and starts a proton.Engine for each one. -func (b *broker) run() error { - listener, err := net.Listen("tcp", *addr) - if err != nil { - return err - } - defer listener.Close() - fmt.Printf("Listening on %s\n", listener.Addr()) - for { - conn, err := listener.Accept() - if err != nil { - util.Debugf("Accept error: %v", err) - continue - } - adapter := proton.NewMessagingAdapter(newHandler(&b.queues)) - // We want to accept messages when they are enqueued, not just when they - // are received, so we turn off auto-accept and prefetch by the adapter. - adapter.Prefetch = 0 - adapter.AutoAccept = false - engine, err := proton.NewEngine(conn, adapter) - if err != nil { - util.Debugf("Connection error: %v", err) - continue - } - engine.Server() // Enable server-side protocol negotiation. - util.Debugf("Accepted connection %s", engine) - go func() { // Start goroutine to run the engine event loop - engine.Run() - util.Debugf("Closed %s", engine) - }() - } -} - -// handler handles AMQP events. There is one handler per connection. The -// handler does not need to be concurrent-safe as proton.Engine will serialize -// all calls to the handler. We use channels to communicate between the handler -// goroutine and other goroutines sending and receiving messages. -type handler struct { - queues *util.Queues - receivers map[proton.Link]*receiver - senders map[proton.Link]*sender - injecter proton.Injecter -} - -func newHandler(queues *util.Queues) *handler { - return &handler{ - queues: queues, - receivers: make(map[proton.Link]*receiver), - senders: make(map[proton.Link]*sender), - } -} - -// HandleMessagingEvent handles an event, called in the handler goroutine. -func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) { - switch t { - - case proton.MStart: - h.injecter = e.Injecter() - - case proton.MLinkOpening: - if e.Link().IsReceiver() { - h.startReceiver(e) - } else { - h.startSender(e) - } - - case proton.MLinkClosed: - h.linkClosed(e.Link(), e.Link().RemoteCondition().Error()) - - case proton.MSendable: - if s, ok := h.senders[e.Link()]; ok { - s.sendable() // Signal the send goroutine that we have credit. - } else { - proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s sender not found", e.Link())) - } - - case proton.MMessage: - m, err := e.Delivery().Message() // Message() must be called while handling the MMessage event. - if err != nil { - proton.CloseError(e.Link(), err) - break - } - r, ok := h.receivers[e.Link()] - if !ok { - proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s receiver not found", e.Link())) - break - } - // This will not block as AMQP credit is set to the buffer capacity. - r.buffer <- receivedMessage{e.Delivery(), m} - util.Debugf("link %s received %s", e.Link(), util.FormatMessage(m)) - - case proton.MConnectionClosed, proton.MDisconnected: - for l, _ := range h.receivers { - h.linkClosed(l, nil) - } - for l, _ := range h.senders { - h.linkClosed(l, nil) - } - } -} - -// linkClosed is called when a link has been closed by both ends. -// It removes the link from the handlers maps and stops its goroutine. -func (h *handler) linkClosed(l proton.Link, err error) { - if s, ok := h.senders[l]; ok { - s.stop() - delete(h.senders, l) - } else if r, ok := h.receivers[l]; ok { - r.stop() - delete(h.receivers, l) - } -} - -// link has some common data and methods that are used by the sender and receiver types. -// -// An active link is represented by a sender or receiver value and a goroutine -// running its run() method. The run() method communicates with the handler via -// channels. -type link struct { - l proton.Link - q util.Queue - h *handler -} - -func makeLink(l proton.Link, q util.Queue, h *handler) link { - lnk := link{l: l, q: q, h: h} - return lnk -} - -// receiver has a channel to buffer messages that have been received by the -// handler and are waiting to go on the queue. AMQP credit ensures that the -// handler does not overflow the buffer and block. -type receiver struct { - link - buffer chan receivedMessage -} - -// receivedMessage holds a message and a Delivery so that the message can be -// acknowledged when it is put on the queue. -type receivedMessage struct { - delivery proton.Delivery - message amqp.Message -} - -// startReceiver creates a receiver and a goroutine for its run() method. -func (h *handler) startReceiver(e proton.Event) { - q := h.queues.Get(e.Link().RemoteTarget().Address()) - r := &receiver{ - link: makeLink(e.Link(), q, h), - buffer: make(chan receivedMessage, *credit), - } - h.receivers[r.l] = r - r.l.Flow(cap(r.buffer)) // Give credit to fill the buffer to capacity. - go r.run() -} - -// run runs in a separate goroutine. It moves messages from the buffer to the -// queue for a receiver link, and injects a handler function to acknowledge the -// message and send a credit. -func (r *receiver) run() { - for rm := range r.buffer { - r.q <- rm.message - d := rm.delivery - // We are not in the handler goroutine so we Inject the accept function as a closure. - r.h.injecter.Inject(func() { - // Check that the receiver is still open, it may have been closed by the remote end. - if r == r.h.receivers[r.l] { - d.Accept() // Accept the delivery - r.l.Flow(1) // Add one credit - } - }) - } -} - -// stop closes the buffer channel and waits for the run() goroutine to stop. -func (r *receiver) stop() { - close(r.buffer) -} - -// sender has a channel that is used to signal when there is credit to send messages. -type sender struct { - link - credit chan struct{} // Channel to signal availability of credit. -} - -// startSender creates a sender and starts a goroutine for sender.run() -func (h *handler) startSender(e proton.Event) { - q := h.queues.Get(e.Link().RemoteSource().Address()) - s := &sender{ - link: makeLink(e.Link(), q, h), - credit: make(chan struct{}, 1), // Capacity of 1 for signalling. - } - h.senders[e.Link()] = s - go s.run() -} - -// stop closes the credit channel and waits for the run() goroutine to stop. -func (s *sender) stop() { - close(s.credit) -} - -// sendable signals that the sender has credit, it does not block. -// sender.credit has capacity 1, if it is already full we carry on. -func (s *sender) sendable() { - select { // Non-blocking - case s.credit <- struct{}{}: - default: - } -} - -// run runs in a separate goroutine. It monitors the queue for messages and injects -// a function to send them when there is credit -func (s *sender) run() { - var q util.Queue // q is nil initially as we have no credit. - for { - select { - case _, ok := <-s.credit: - if !ok { // sender closed - return - } - q = s.q // We have credit, enable selecting on the queue. - - case m, ok := <-q: // q is only enabled when we have credit. - if !ok { // queue closed - return - } - q = nil // Assume all credit will be used used, will be signaled otherwise. - s.h.injecter.Inject(func() { // Inject handler function to actually send - if s.h.senders[s.l] != s { // The sender has been closed by the remote end. - q.PutBack(m) // Put the message back on the queue but don't block - return - } - if s.sendOne(m) != nil { - return - } - // Send as many more messages as we can without blocking - for s.l.Credit() > 0 { - select { // Non blocking receive from q - case m, ok := <-s.q: - if ok { - s.sendOne(m) - } - default: // Queue is empty but we have credit, signal the run() goroutine. - s.sendable() - } - } - }) - } - } -} - -// sendOne runs in the handler goroutine. It sends a single message. -func (s *sender) sendOne(m amqp.Message) error { - delivery, err := s.l.Send(m) - if err == nil { - delivery.Settle() // Pre-settled, unreliable. - util.Debugf("link %s sent %s", s.l, util.FormatMessage(m)) - } else { - s.q.PutBack(m) // Put the message back on the queue, don't block - } - return err -}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/go/util/queue.go ---------------------------------------------------------------------- diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go deleted file mode 100644 index 2eaba72..0000000 --- a/examples/go/util/queue.go +++ /dev/null @@ -1,61 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package util - -import ( - "qpid.apache.org/amqp" - "sync" -) - -// Use a buffered channel as a very simple queue. -type Queue chan amqp.Message - -// Put a message back on the queue, does not block. -func (q Queue) PutBack(m amqp.Message) { - select { - case q <- m: - default: - // Not an efficient implementation but ensures we don't block the caller. - go func() { q <- m }() - } -} - -// Concurrent-safe map of queues. -type Queues struct { - queueSize int - m map[string]Queue - lock sync.Mutex -} - -func MakeQueues(queueSize int) Queues { - return Queues{queueSize: queueSize, m: make(map[string]Queue)} -} - -// Create a queue if not found. -func (qs *Queues) Get(name string) Queue { - qs.lock.Lock() - defer qs.lock.Unlock() - q := qs.m[name] - if q == nil { - q = make(Queue, qs.queueSize) - qs.m[name] = q - } - return q -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/go/util/util.go ---------------------------------------------------------------------- diff --git a/examples/go/util/util.go b/examples/go/util/util.go deleted file mode 100644 index 20f2192..0000000 --- a/examples/go/util/util.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -// util contains utility types and functions to simplify parts of the example -// code that are not related to the use of proton. -package util - -import ( - "flag" - "fmt" - "log" - "os" - "path" - "qpid.apache.org/amqp" -) - -// Debug flag "-debug" enables debug output with Debugf -var Debug = flag.Bool("debug", false, "Print detailed debug output") - -// Full flag "-full" enables full message output by FormatMessage -var Full = flag.Bool("full", false, "Print full message not just body.") - -// Debugf logs debug messages if "-debug" flag is set. -func Debugf(format string, data ...interface{}) { - if *Debug { - log.Printf(format, data...) - } -} - -// Simple error handling for demo. -func ExitIf(err error) { - if err != nil { - log.Fatal(err) - } -} - -// FormatMessage formats a message as a string, just the body by default or -// the full message (with properties etc.) if "-full" flag is set. -func FormatMessage(m amqp.Message) string { - if *Full { - return fmt.Sprintf("%#v", m) - } else { - return fmt.Sprintf("%#v", m.Body()) - } -} - -// For example programs, use the program name as the log prefix. -func init() { - log.SetFlags(0) - _, prog := path.Split(os.Args[0]) - log.SetPrefix(fmt.Sprintf("%s(%d): ", prog, os.Getpid())) -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/javascript/messenger/client.js ---------------------------------------------------------------------- diff --git a/examples/javascript/messenger/client.js b/examples/javascript/messenger/client.js deleted file mode 100755 index d128503..0000000 --- a/examples/javascript/messenger/client.js +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env node -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// Simple client for use with server.js illustrating request/response - -// Check if the environment is Node.js and if not log an error and exit. -if (typeof process === 'object' && typeof require === 'function') { - var proton = require("qpid-proton-messenger"); - - var address = "amqp://0.0.0.0"; - var subject = "UK.WEATHER"; - var replyTo = "~/replies"; - var msgtext = "Hello World!"; - var tracker = null; - - var message = new proton.Message(); - var messenger = new proton.Messenger(); - - var pumpData = function() { - while (messenger.incoming()) { - var t = messenger.get(message); - - console.log("Reply:"); - console.log("Address: " + message.getAddress()); - console.log("Subject: " + message.getSubject()); - - // body is the body as a native JavaScript Object, useful for most real cases. - //console.log("Content: " + message.body); - - // data is the body as a proton.Data Object, used in this case because - // format() returns exactly the same representation as recv.c - console.log("Content: " + message.data.format()); - - messenger.accept(t); - messenger.stop(); - } - - if (messenger.isStopped()) { - message.free(); - messenger.free(); - } - }; - - var args = process.argv.slice(2); - if (args.length > 0) { - if (args[0] === '-h' || args[0] === '--help') { - console.log("Usage: node client.js [-r replyTo] [-s subject] <addr> (default " + address + ")"); - console.log("Options:"); - console.log(" -r <reply to> The message replyTo (default " + replyTo + ")"); - console.log(" -s <subject> The message subject (default " + subject + ")"); - process.exit(0); - } - - for (var i = 0; i < args.length; i++) { - var arg = args[i]; - if (arg.charAt(0) === '-') { - i++; - var val = args[i]; - if (arg === '-r') { - replyTo = val; - } else if (arg === '-s') { - subject = val; - } - } else { - address = arg; - } - } - } - - messenger.on('error', function(error) {console.log(error);}); - messenger.on('work', pumpData); - messenger.setOutgoingWindow(1024); - messenger.recv(); // Receive as many messages as messenger can buffer. - messenger.start(); - - message.setAddress(address); - message.setSubject(subject); - message.setReplyTo(replyTo); - message.body = msgtext; - - tracker = messenger.put(message); -} else { - console.error("client.js should be run in Node.js"); -} - http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/javascript/messenger/drain.js ---------------------------------------------------------------------- diff --git a/examples/javascript/messenger/drain.js b/examples/javascript/messenger/drain.js deleted file mode 100755 index 1df6fd4..0000000 --- a/examples/javascript/messenger/drain.js +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env node -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -// Check if the environment is Node.js and if not log an error and exit. -if (typeof process === 'object' && typeof require === 'function') { - var proton = require("qpid-proton-messenger"); - - console.log("drain not implemented yet"); - process.exit(0); - - var address = "amqp://~0.0.0.0"; - var message = new proton.Message(); - var messenger = new proton.Messenger(); - - var pumpData = function() { - while (messenger.incoming()) { - var t = messenger.get(message); - - console.log("Address: " + message.getAddress()); - console.log("Subject: " + message.getSubject()); - - // body is the body as a native JavaScript Object, useful for most real cases. - //console.log("Content: " + message.body); - - // data is the body as a proton.Data Object, used in this case because - // format() returns exactly the same representation as recv.c - console.log("Content: " + message.data.format()); - - messenger.accept(t); - } - }; - - var args = process.argv.slice(2); - if (args.length > 0) { - if (args[0] === '-h' || args[0] === '--help') { - console.log("Usage: recv <addr> (default " + address + ")."); - process.exit(0); - } - - address = args[0]; - } - - messenger.on('error', function(error) {console.log(error);}); - messenger.on('work', pumpData); - messenger.recv(); // Receive as many messages as messenger can buffer. - messenger.start(); - - messenger.subscribe(address); -} else { - console.error("drain.js should be run in Node.js"); -} - http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/javascript/messenger/proxy.js ---------------------------------------------------------------------- diff --git a/examples/javascript/messenger/proxy.js b/examples/javascript/messenger/proxy.js deleted file mode 100755 index cac5cf5..0000000 --- a/examples/javascript/messenger/proxy.js +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env node -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * proxy.js is a simple node.js command line application that uses the ws2tcp.js - * library to proxy from a WebSocket to a TCP Socket or vice versa. - * <p> - * Usage: node proxy.js [options] - * Options:"); - * -p <listen port>, --port <listen port> (default 5673 for ws2tcp - * 5672 for tcp2ws) - * -t <target port>, --tport <target port> (default listen port - 1 for ws2tcp - * listen port + 1 for tcp2ws) - * -h <target host>, --thost <target host> (default 0.0.0.0) - * -m <ws2tcp or tcp2ws>, --method <ws2tcp or tcp2ws> (default ws2tcp) - * @Author Fraser Adams - * @file - */ - -// Check if the environment is Node.js and if not log an error and exit. -if (typeof process === 'object' && typeof require === 'function') { - var proxy = require('./ws2tcp.js'); - - var lport = 5673; - var tport = lport - 1; - var thost = '0.0.0.0'; - var method = 'ws2tcp'; - - var args = process.argv.slice(2); - if (args.length > 0) { - if (args[0] === '-h' || args[0] === '--help') { - console.log("Usage: node proxy.js [options]"); - console.log("Options:"); - console.log(" -p <listen port>, --port <listen port> (default " + lport + " for ws2tcp"); - console.log(" " + tport + " for tcp2ws)"); - console.log(" -t <target port>, --tport <target port> (default listen port - 1 for ws2tcp"); - console.log(" listen port + 1 for tcp2ws)"); - console.log(" -h <target host>, --thost <target host> (default " + thost + ")"); - console.log(" -m <ws2tcp or tcp2ws>, --method <ws2tcp or tcp2ws> (default " + method + ")"); - process.exit(0); - } - - var lportSet = false; - var tportSet = false; - for (var i = 0; i < args.length; i++) { - var arg = args[i]; - if (arg.charAt(0) === '-') { - i++; - var val = args[i]; - if (arg === '-p' || arg === '--port') { - lport = val; - lportSet = true; - } else if (arg === '-t' || arg === '--tport') { - tport = val; - tportSet = true; - } else if (arg === '-h' || arg === '--thost') { - thost = val; - } else if (arg === '-m' || arg === '--method') { - method = val; - } - } - } - - if (method === 'tcp2ws' && !lportSet) { - lport--; - } - - if (!tportSet) { - tport = (method === 'ws2tcp') ? lport - 1 : +lport + 1; - } - } - - if (method === 'tcp2ws') { - console.log("Proxying tcp -> ws"); - console.log("Forwarding port " + lport + " to " + thost + ":" + tport); - proxy.tcp2ws(lport, thost, tport, 'AMQPWSB10'); - } else if (method === 'ws2tcp') { - console.log("Proxying ws -> tcp"); - console.log("Forwarding port " + lport + " to " + thost + ":" + tport); - proxy.ws2tcp(lport, thost, tport); - } else { - console.error("Method must be either ws2tcp or tcp2ws."); - } -} else { - console.error("proxy.js should be run in Node.js"); -} - --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
