This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f3c73eb4 PROTON-2889: [C++ examples] Broker improvements
1f3c73eb4 is described below

commit 1f3c73eb4c59718e27391c8a92d7cf6863d916b0
Author: Andrew Stitcher <[email protected]>
AuthorDate: Tue Apr 8 21:58:37 2025 -0400

    PROTON-2889: [C++ examples] Broker improvements
    
    Deal with released messages - requeue up until a limit
    More use of C++17 features
---
 cpp/examples/broker.cpp | 67 ++++++++++++++++++++++++++++++++++---------------
 1 file changed, 47 insertions(+), 20 deletions(-)

diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp
index 32591ebbb..7e8b9d5ee 100644
--- a/cpp/examples/broker.cpp
+++ b/cpp/examples/broker.cpp
@@ -39,9 +39,9 @@
 
 #include <deque>
 #include <iostream>
-#include <map>
 #include <string>
 #include <thread>
+#include <unordered_map>
 
 
 // This is a simplified model for a message broker, that only allows for
@@ -74,8 +74,10 @@
 
 
 // Simple debug output
-bool verbose;
-#define DOUT(x) do {if (verbose) {x};} while (false)
+bool verbose = false;
+#define DOUT(x) do {if (verbose) {x}} while (false)
+
+unsigned redelivery_limit = 5;
 
 class Queue;
 class Sender;
@@ -85,17 +87,19 @@ class Sender : public proton::messaging_handler {
 
     proton::sender sender_;
     proton::work_queue& work_queue_;
+    std::unordered_map<proton::binary, proton::message> unsettled_messages_;
     std::string queue_name_;
-    Queue* queue_;
-    int pending_credit_;
+    Queue* queue_ = nullptr;
+    int pending_credit_ = 0;
 
     // Messaging handlers
     void on_sendable(proton::sender &sender) override;
     void on_sender_close(proton::sender &sender) override;
+    void on_tracker_settle(proton::tracker& tracker) override;
 
 public:
     Sender(proton::sender s) :
-        sender_(s), work_queue_(s.work_queue()), queue_(0), pending_credit_(0)
+        sender_(s), work_queue_(s.work_queue())
     {
         s.user_data(this);
     }
@@ -110,7 +114,8 @@ public:
     void boundQueue(Queue* q, std::string qn);
     void sendMsg(proton::message m) {
         DOUT(std::cerr << "Sender:   " << this << " sending\n";);
-        sender_.send(m);
+        auto t = sender_.send(m);
+        unsettled_messages_[t.tag()] = m;
     }
     void unsubscribed() {
         DOUT(std::cerr << "Sender:   " << this << " deleting\n";);
@@ -125,9 +130,9 @@ class Queue {
     proton::work_queue work_queue_;
     const std::string name_;
     std::deque<proton::message> messages_;
-    typedef std::map<Sender*, int> subscriptions; // With credit
+    using subscriptions = std::unordered_map<Sender*, int>; // With credit
     subscriptions subscriptions_;
-    subscriptions::iterator current_;
+    subscriptions::iterator current_ = subscriptions_.end();
 
     void tryToSend() {
         DOUT(std::cerr << "Queue:    " << this << " tryToSend: " << 
subscriptions_.size(););
@@ -140,15 +145,15 @@ class Queue {
             if (current_==subscriptions_.end()) {
                 current_=subscriptions_.begin();
             }
+            auto& [sender, credit] = *current_;
             // If we have credit send the message
-            DOUT(std::cerr << "(" << current_->second << ") ";);
-            if (current_->second>0) {
-                DOUT(std::cerr << current_->first << " ";);
+            DOUT(std::cerr << "(" << credit << ") ";);
+            if (credit>0) {
+                DOUT(std::cerr << sender << " ";);
                 auto msg = messages_.front();
-                auto sender = current_->first;
                 sender->add([=]{sender->sendMsg(msg);});
                 messages_.pop_front();
-                --current_->second;
+                --credit;
                 ++current_;
             } else {
                 ++outOfCredit;
@@ -159,7 +164,7 @@ class Queue {
 
 public:
     Queue(proton::container& c, const std::string& n) :
-        work_queue_(c), name_(n), current_(subscriptions_.end())
+        work_queue_(c), name_(n)
     {}
 
     bool add(proton::work f) {
@@ -209,6 +214,28 @@ void Sender::on_sender_close(proton::sender &sender) {
     }
 }
 
+void Sender::on_tracker_settle(proton::tracker& tracker) {
+    auto tag = tracker.tag();
+    auto state = tracker.state();
+    DOUT(std::cerr << "Sender:   " << this << " on_tracker_settle: " << tag << 
": " << state << "\n";);
+    if (auto i = unsettled_messages_.find(tag); i != 
unsettled_messages_.end()) {
+        auto [_, msg] = *i;
+        if (state==PN_RELEASED || state==PN_MODIFIED) {
+            // The message was released or modified, so we need to resend it
+            if (state==PN_MODIFIED) {
+                auto delivery_count = msg.delivery_count();
+                if (delivery_count<redelivery_limit) {
+                    msg.delivery_count(delivery_count + 1);
+                    queue_->add([=] {queue_->queueMsg(msg);});
+                } else {
+                    DOUT(std::cerr << "Sender:   " << this << " 
on_tracker_settle: " << tag << ": Too many redeliveries: " << delivery_count << 
"\n";);
+                }
+            }
+        }
+        unsettled_messages_.erase(i);
+    }
+};
+
 void Sender::boundQueue(Queue* q, std::string qn) {
     DOUT(std::cerr << "Sender:   " << this << " bound to Queue: " << q <<"(" 
<< qn << ")\n";);
     queue_ = q;
@@ -234,13 +261,13 @@ class Receiver : public proton::messaging_handler {
 
     proton::receiver receiver_;
     proton::work_queue& work_queue_;
-    Queue* queue_;
+    Queue* queue_ = nullptr;
     QueueManager& queue_manager_;
     std::deque<proton::message> messages_;
 
     // A message is received.
     void on_message(proton::delivery &d, proton::message &m) override {
-        // We allow anonymous relay behaviour always even if not requested
+        // We allow anonymous relay behaviour always, even if not requested
         auto to_address = m.to();
         if (queue_) {
             messages_.push_back(m);
@@ -266,7 +293,7 @@ class Receiver : public proton::messaging_handler {
 
 public:
     Receiver(proton::receiver r, QueueManager& qm) :
-        receiver_(r), work_queue_(r.work_queue()), queue_(0), 
queue_manager_(qm)
+        receiver_(r), work_queue_(r.work_queue()), queue_manager_(qm)
     {}
 
     bool add(proton::work f) {
@@ -288,7 +315,7 @@ public:
 class QueueManager {
     proton::container& container_;
     proton::work_queue work_queue_;
-    typedef std::map<std::string, Queue*> queues;
+    typedef std::unordered_map<std::string, Queue*> queues;
     queues queues_;
     int next_id_; // Use to generate unique queue IDs.
 
@@ -453,9 +480,9 @@ int main(int argc, char **argv) {
 
     opts.add_flag(verbose, 'v', "verbose", "verbose (debugging) output");
     opts.add_value(address, 'a', "address", "listen on URL", "URL");
+    opts.add_value(redelivery_limit, 'l', "redelivery-limit", "max redelivery 
attempts", "N");
 
     try {
-        verbose = false;
         opts.parse();
         broker(address).run();
         return 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to