This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 06956b155658ff3829bfe920f386caf5c7941c46 Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Wed Jan 4 11:29:01 2023 -0500 PROTON-2666: Add support for message 'to' to C++ example broker This amounts to supporting the ANONYMOUS-RELAY capability --- cpp/examples/broker.cpp | 55 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp index bcf8ab6ae..c09e0b0e0 100644 --- a/cpp/examples/broker.cpp +++ b/cpp/examples/broker.cpp @@ -225,20 +225,29 @@ void Sender::boundQueue(Queue* q, std::string qn) { std::cout << "sending from " << queue_name_ << std::endl; } +class QueueManager; + class Receiver : public proton::messaging_handler { friend class connection_handler; proton::receiver receiver_; proton::work_queue& work_queue_; Queue* queue_; + QueueManager& queue_manager_; std::deque<proton::message> messages_; // A message is received. - void on_message(proton::delivery &, proton::message &m) override { - messages_.push_back(m); - + void on_message(proton::delivery &d, proton::message &m) override { + // We allow anonymous relay behaviour always even if not requested + auto to_address = m.to(); if (queue_) { + messages_.push_back(m); queueMsgs(); + } else if (!to_address.empty()) { + queueMsgToNamedQueue(m, to_address); + } else { + // No bound link queue, no message 'to address' - reject message + d.reject(); } } @@ -251,9 +260,11 @@ class Receiver : public proton::messaging_handler { } } + void queueMsgToNamedQueue(proton::message& m, std::string address); + public: - Receiver(proton::receiver r) : - receiver_(r), work_queue_(r.work_queue()), queue_(0) + Receiver(proton::receiver r, QueueManager& qm) : + receiver_(r), work_queue_(r.work_queue()), queue_(0), queue_manager_(qm) {} bool add(proton::work f) { @@ -297,7 +308,7 @@ public: qn = os.str(); } Queue* q = 0; - queues::iterator i = queues_.find(qn); + auto i = queues_.find(qn); if (i==queues_.end()) { q = new Queue(container_, qn); queues_[qn] = q; @@ -307,6 +318,18 @@ public: connection.add([=, &connection] {connection.boundQueue(q, qn);}); } + void queueMessage(proton::message m, std::string address) { + Queue* q = 0; + auto i = queues_.find(address); + if (i==queues_.end()) { + q = new Queue(container_, address); + queues_[address] = q; + } else { + q = i->second; + } + q->add([=] {q->queueMsg(m);}); + } + void findQueueSender(Sender* s, std::string qn) { findQueue(*s, qn); } @@ -316,6 +339,11 @@ public: } }; +void Receiver::queueMsgToNamedQueue(proton::message& m, std::string address) { + DOUT(std::cerr << "Receiver: " << this << " send msg to Queue: " << address << "\n";); + queue_manager_.add([=]{queue_manager_.queueMessage(m, address);}); +} + class connection_handler : public proton::messaging_handler { QueueManager& queue_manager_; senders senders_; @@ -326,7 +354,10 @@ public: {} void on_connection_open(proton::connection& c) override { - c.open(); // Accept the connection + // Don't check whether the peer desires ANONYMOUS-RELAY: offer it anyway. + // Accept the connection + c.open(proton::connection_options{} + .offered_capabilities({"ANONYMOUS-RELAY"})); } // A sender sends messages from a queue to a subscriber. @@ -340,8 +371,14 @@ public: // A receiver receives messages from a publisher to a queue. void on_receiver_open(proton::receiver &receiver) override { std::string qname = receiver.target().address(); - Receiver* r = new Receiver(receiver); - queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);}); + Receiver* r = new Receiver(receiver, queue_manager_); + // Allow anonymous relay always + if (qname.empty()) { + receiver.open(proton::receiver_options{} + .handler(*r)); + } else { + queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);}); + } } void on_session_close(proton::session &session) override { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org