This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 1f3c73eb4 PROTON-2889: [C++ examples] Broker improvements
1f3c73eb4 is described below
commit 1f3c73eb4c59718e27391c8a92d7cf6863d916b0
Author: Andrew Stitcher <[email protected]>
AuthorDate: Tue Apr 8 21:58:37 2025 -0400
PROTON-2889: [C++ examples] Broker improvements
Deal with released messages - requeue up until a limit
More use of C++17 features
---
cpp/examples/broker.cpp | 67 ++++++++++++++++++++++++++++++++++---------------
1 file changed, 47 insertions(+), 20 deletions(-)
diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index 32591ebbb..7e8b9d5ee 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -39,9 +39,9 @@
#include <deque>
#include <iostream>
-#include <map>
#include <string>
#include <thread>
+#include <unordered_map>
// This is a simplified model for a message broker, that only allows for
@@ -74,8 +74,10 @@
// Simple debug output
-bool verbose;
-#define DOUT(x) do {if (verbose) {x};} while (false)
+bool verbose = false;
+#define DOUT(x) do {if (verbose) {x}} while (false)
+
+unsigned redelivery_limit = 5;
class Queue;
class Sender;
@@ -85,17 +87,19 @@ class Sender : public proton::messaging_handler {
proton::sender sender_;
proton::work_queue& work_queue_;
+ std::unordered_map<proton::binary, proton::message> unsettled_messages_;
std::string queue_name_;
- Queue* queue_;
- int pending_credit_;
+ Queue* queue_ = nullptr;
+ int pending_credit_ = 0;
// Messaging handlers
void on_sendable(proton::sender &sender) override;
void on_sender_close(proton::sender &sender) override;
+ void on_tracker_settle(proton::tracker& tracker) override;
public:
Sender(proton::sender s) :
- sender_(s), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
+ sender_(s), work_queue_(s.work_queue())
{
s.user_data(this);
}
@@ -110,7 +114,8 @@ public:
void boundQueue(Queue* q, std::string qn);
void sendMsg(proton::message m) {
DOUT(std::cerr << "Sender: " << this << " sending\n";);
- sender_.send(m);
+ auto t = sender_.send(m);
+ unsettled_messages_[t.tag()] = m;
}
void unsubscribed() {
DOUT(std::cerr << "Sender: " << this << " deleting\n";);
@@ -125,9 +130,9 @@ class Queue {
proton::work_queue work_queue_;
const std::string name_;
std::deque<proton::message> messages_;
- typedef std::map<Sender*, int> subscriptions; // With credit
+ using subscriptions = std::unordered_map<Sender*, int>; // With credit
subscriptions subscriptions_;
- subscriptions::iterator current_;
+ subscriptions::iterator current_ = subscriptions_.end();
void tryToSend() {
DOUT(std::cerr << "Queue: " << this << " tryToSend: " <<
subscriptions_.size(););
@@ -140,15 +145,15 @@ class Queue {
if (current_==subscriptions_.end()) {
current_=subscriptions_.begin();
}
+ auto& [sender, credit] = *current_;
// If we have credit send the message
- DOUT(std::cerr << "(" << current_->second << ") ";);
- if (current_->second>0) {
- DOUT(std::cerr << current_->first << " ";);
+ DOUT(std::cerr << "(" << credit << ") ";);
+ if (credit>0) {
+ DOUT(std::cerr << sender << " ";);
auto msg = messages_.front();
- auto sender = current_->first;
sender->add([=]{sender->sendMsg(msg);});
messages_.pop_front();
- --current_->second;
+ --credit;
++current_;
} else {
++outOfCredit;
@@ -159,7 +164,7 @@ class Queue {
public:
Queue(proton::container& c, const std::string& n) :
- work_queue_(c), name_(n), current_(subscriptions_.end())
+ work_queue_(c), name_(n)
{}
bool add(proton::work f) {
@@ -209,6 +214,28 @@ void Sender::on_sender_close(proton::sender &sender) {
}
}
+void Sender::on_tracker_settle(proton::tracker& tracker) {
+ auto tag = tracker.tag();
+ auto state = tracker.state();
+ DOUT(std::cerr << "Sender: " << this << " on_tracker_settle: " << tag <<
": " << state << "\n";);
+ if (auto i = unsettled_messages_.find(tag); i !=
unsettled_messages_.end()) {
+ auto [_, msg] = *i;
+ if (state==PN_RELEASED || state==PN_MODIFIED) {
+ // The message was released or modified, so we need to resend it
+ if (state==PN_MODIFIED) {
+ auto delivery_count = msg.delivery_count();
+ if (delivery_count<redelivery_limit) {
+ msg.delivery_count(delivery_count + 1);
+ queue_->add([=] {queue_->queueMsg(msg);});
+ } else {
+ DOUT(std::cerr << "Sender: " << this << "
on_tracker_settle: " << tag << ": Too many redeliveries: " << delivery_count <<
"\n";);
+ }
+ }
+ }
+ unsettled_messages_.erase(i);
+ }
+};
+
void Sender::boundQueue(Queue* q, std::string qn) {
DOUT(std::cerr << "Sender: " << this << " bound to Queue: " << q <<"("
<< qn << ")\n";);
queue_ = q;
@@ -234,13 +261,13 @@ class Receiver : public proton::messaging_handler {
proton::receiver receiver_;
proton::work_queue& work_queue_;
- Queue* queue_;
+ Queue* queue_ = nullptr;
QueueManager& queue_manager_;
std::deque<proton::message> messages_;
// A message is received.
void on_message(proton::delivery &d, proton::message &m) override {
- // We allow anonymous relay behaviour always even if not requested
+ // We allow anonymous relay behaviour always, even if not requested
auto to_address = m.to();
if (queue_) {
messages_.push_back(m);
@@ -266,7 +293,7 @@ class Receiver : public proton::messaging_handler {
public:
Receiver(proton::receiver r, QueueManager& qm) :
- receiver_(r), work_queue_(r.work_queue()), queue_(0),
queue_manager_(qm)
+ receiver_(r), work_queue_(r.work_queue()), queue_manager_(qm)
{}
bool add(proton::work f) {
@@ -288,7 +315,7 @@ public:
class QueueManager {
proton::container& container_;
proton::work_queue work_queue_;
- typedef std::map<std::string, Queue*> queues;
+ typedef std::unordered_map<std::string, Queue*> queues;
queues queues_;
int next_id_; // Use to generate unique queue IDs.
@@ -453,9 +480,9 @@ int main(int argc, char **argv) {
opts.add_flag(verbose, 'v', "verbose", "verbose (debugging) output");
opts.add_value(address, 'a', "address", "listen on URL", "URL");
+ opts.add_value(redelivery_limit, 'l', "redelivery-limit", "max redelivery
attempts", "N");
try {
- verbose = false;
opts.parse();
broker(address).run();
return 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]