This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit a131afbc251821cdeacbba074133e9cd6dd51282 Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Wed Jan 4 11:15:12 2023 -0500 PROTON-2665: Convert C++ broker example to use lambdas We can make the code more idiomatic by using C++11 features. --- cpp/examples/broker.cpp | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp index 30b602821..bcf8ab6ae 100644 --- a/cpp/examples/broker.cpp +++ b/cpp/examples/broker.cpp @@ -141,7 +141,9 @@ class Queue { DOUT(std::cerr << "(" << current_->second << ") ";); if (current_->second>0) { DOUT(std::cerr << current_->first << " ";); - current_->first->add(make_work(&Sender::sendMsg, current_->first, messages_.front())); + auto msg = messages_.front(); + auto sender = current_->first; + sender->add([=]{sender->sendMsg(msg);}); messages_.pop_front(); --current_->second; ++current_; @@ -180,14 +182,15 @@ public: // If we're about to erase the current subscription move on if (current_ != subscriptions_.end() && current_->first==s) ++current_; subscriptions_.erase(s); - s->add(make_work(&Sender::unsubscribed, s)); + s->add([=]{s->unsubscribed();}); } }; // We have credit to send a message. void Sender::on_sendable(proton::sender &sender) { if (queue_) { - queue_->add(make_work(&Queue::flow, queue_, this, sender.credit())); + auto credit = sender.credit(); + queue_->add([=]{queue_->flow(this, credit);}); } else { pending_credit_ = sender.credit(); } @@ -195,7 +198,7 @@ void Sender::on_sendable(proton::sender &sender) { void Sender::on_sender_close(proton::sender &sender) { if (queue_) { - queue_->add(make_work(&Queue::unsubscribe, queue_, this)); + queue_->add([=]{queue_->unsubscribe(this);}); } else { // TODO: Is it possible to be closed before we get the queue allocated? // If so, we should have a way to mark the sender deleted, so we can delete @@ -209,13 +212,16 @@ void Sender::boundQueue(Queue* q, std::string qn) { queue_ = q; queue_name_ = qn; - q->add(make_work(&Queue::subscribe, q, this)); sender_.open(proton::sender_options() .source((proton::source_options().address(queue_name_))) .handler(*this)); - if (pending_credit_>0) { - queue_->add(make_work(&Queue::flow, queue_, this, pending_credit_)); - } + auto credit = pending_credit_; + q->add([=]{ + q->subscribe(this); + if (credit>0) { + q->flow(this, credit); + } + }); std::cout << "sending from " << queue_name_ << std::endl; } @@ -239,7 +245,8 @@ class Receiver : public proton::messaging_handler { void queueMsgs() { DOUT(std::cerr << "Receiver: " << this << " queueing " << messages_.size() << " msgs to: " << queue_ << "\n";); while (!messages_.empty()) { - queue_->add(make_work(&Queue::queueMsg, queue_, messages_.front())); + auto msg = messages_.front(); + queue_->add([=]{queue_->queueMsg(msg);}); messages_.pop_front(); } } @@ -297,7 +304,7 @@ public: } else { q = i->second; } - connection.add(make_work(&T::boundQueue, &connection, q, qn)); + connection.add([=, &connection] {connection.boundQueue(q, qn);}); } void findQueueSender(Sender* s, std::string qn) { @@ -327,14 +334,14 @@ public: std::string qn = sender.source().dynamic() ? "" : sender.source().address(); Sender* s = new Sender(sender, senders_); senders_[sender] = s; - queue_manager_.add(make_work(&QueueManager::findQueueSender, &queue_manager_, s, qn)); + queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);}); } // A receiver receives messages from a publisher to a queue. void on_receiver_open(proton::receiver &receiver) override { std::string qname = receiver.target().address(); Receiver* r = new Receiver(receiver); - queue_manager_.add(make_work(&QueueManager::findQueueReceiver, &queue_manager_, r, qname)); + queue_manager_.add([=]{queue_manager_.findQueueReceiver(r, qname);}); } void on_session_close(proton::session &session) override { @@ -344,7 +351,8 @@ public: if (j == senders_.end()) continue; Sender* s = j->second; if (s->queue_) { - s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s)); + auto q = s->queue_; + s->queue_->add([=]{q->unsubscribe(s);}); } senders_.erase(j); } @@ -362,7 +370,8 @@ public: if (j == senders_.end()) continue; Sender* s = j->second; if (s->queue_) { - s->queue_->add(make_work(&Queue::unsubscribe, s->queue_, s)); + auto q = s->queue_; + s->queue_->add([=]{q->unsubscribe(s);}); } } delete this; // All done. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org