This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push: new 51583a516 PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries 51583a516 is described below commit 51583a51652074586d4672b116224ec1b48de387 Author: Rakhi Kumari <rakhi.c...@gmail.com> AuthorDate: Mon Dec 5 15:20:13 2022 +0530 PROTON-2657: [cpp] Accessors for user data on endpoints and deliveries --- cpp/examples/broker.cpp | 29 +++---- cpp/include/proton/connection.hpp | 6 ++ cpp/include/proton/link.hpp | 6 ++ cpp/include/proton/listener.hpp | 6 ++ cpp/include/proton/session.hpp | 6 ++ cpp/include/proton/transfer.hpp | 6 ++ cpp/src/connection.cpp | 10 +++ cpp/src/context_test.cpp | 169 ++++++++++++++++++++++++++++++++++++++ cpp/src/contexts.cpp | 8 +- cpp/src/contexts.hpp | 13 +++ cpp/src/link.cpp | 13 +++ cpp/src/listener.cpp | 10 +++ cpp/src/session.cpp | 12 +++ cpp/src/transfer.cpp | 11 +++ cpp/tests.cmake | 1 + 15 files changed, 287 insertions(+), 19 deletions(-) diff --git a/cpp/examples/broker.cpp b/cpp/examples/broker.cpp index c09e0b0e0..6c4e24785 100644 --- a/cpp/examples/broker.cpp +++ b/cpp/examples/broker.cpp @@ -80,13 +80,10 @@ bool verbose; class Queue; class Sender; -typedef std::map<proton::sender, Sender*> senders; - class Sender : public proton::messaging_handler { friend class connection_handler; proton::sender sender_; - senders& senders_; proton::work_queue& work_queue_; std::string queue_name_; Queue* queue_; @@ -97,15 +94,19 @@ class Sender : public proton::messaging_handler { void on_sender_close(proton::sender &sender) override; public: - Sender(proton::sender s, senders& ss) : - sender_(s), senders_(ss), work_queue_(s.work_queue()), queue_(0), pending_credit_(0) - {} + Sender(proton::sender s) : + sender_(s), work_queue_(s.work_queue()), queue_(0), pending_credit_(0) + { + s.user_data(this); + } bool add(proton::work f) { return work_queue_.add(f); } - + static Sender* get(const proton::sender& s) { + return reinterpret_cast<Sender*>(s.user_data()); + } void boundQueue(Queue* q, std::string qn); void sendMsg(proton::message m) { DOUT(std::cerr << "Sender: " << this << " sending\n";); @@ -204,7 +205,6 @@ void Sender::on_sender_close(proton::sender &sender) { // If so, we should have a way to mark the sender deleted, so we can delete // on queue binding } - senders_.erase(sender); } void Sender::boundQueue(Queue* q, std::string qn) { @@ -346,7 +346,6 @@ void Receiver::queueMsgToNamedQueue(proton::message& m, std::string address) { class connection_handler : public proton::messaging_handler { QueueManager& queue_manager_; - senders senders_; public: connection_handler(QueueManager& qm) : @@ -363,8 +362,7 @@ public: // A sender sends messages from a queue to a subscriber. void on_sender_open(proton::sender &sender) override { std::string qn = sender.source().dynamic() ? "" : sender.source().address(); - Sender* s = new Sender(sender, senders_); - senders_[sender] = s; + Sender* s = new Sender(sender); queue_manager_.add([=]{queue_manager_.findQueueSender(s, qn);}); } @@ -384,14 +382,11 @@ public: void on_session_close(proton::session &session) override { // Unsubscribe all senders that belong to session. for (proton::sender_iterator i = session.senders().begin(); i != session.senders().end(); ++i) { - senders::iterator j = senders_.find(*i); - if (j == senders_.end()) continue; - Sender* s = j->second; + Sender* s = Sender::get(*i); if (s->queue_) { auto q = s->queue_; s->queue_->add([=]{q->unsubscribe(s);}); } - senders_.erase(j); } } @@ -403,9 +398,7 @@ public: void on_transport_close(proton::transport& t) override { // Unsubscribe all senders. for (proton::sender_iterator i = t.connection().senders().begin(); i != t.connection().senders().end(); ++i) { - senders::iterator j = senders_.find(*i); - if (j == senders_.end()) continue; - Sender* s = j->second; + Sender* s = Sender::get(*i); if (s->queue_) { auto q = s->queue_; s->queue_->add([=]{q->unsubscribe(s);}); diff --git a/cpp/include/proton/connection.hpp b/cpp/include/proton/connection.hpp index bff8e21fb..ed590542d 100644 --- a/cpp/include/proton/connection.hpp +++ b/cpp/include/proton/connection.hpp @@ -202,6 +202,12 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi /// existing parameters as if `connection_options::update()` was used. PN_CPP_EXTERN void update_options(const connection_options&); + /// Set user data on this connection. + PN_CPP_EXTERN void user_data(void* user_data) const; + + /// Get user data from this connection. + PN_CPP_EXTERN void* user_data() const; + /// @cond INTERNAL friend class internal::factory<connection>; friend class container; diff --git a/cpp/include/proton/link.hpp b/cpp/include/proton/link.hpp index c7d55b2e9..ebd81c30b 100644 --- a/cpp/include/proton/link.hpp +++ b/cpp/include/proton/link.hpp @@ -94,6 +94,12 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint /// **Unsettled API** - Properties supplied by the remote link endpoint. PN_CPP_EXTERN std::map<symbol, value> properties() const; + /// Set user data on this link. + PN_CPP_EXTERN void user_data(void* user_data) const; + + /// Get user data from this link. + PN_CPP_EXTERN void* user_data() const; + protected: /// @cond INTERNAL diff --git a/cpp/include/proton/listener.hpp b/cpp/include/proton/listener.hpp index d5d0aba0b..d7e04892d 100644 --- a/cpp/include/proton/listener.hpp +++ b/cpp/include/proton/listener.hpp @@ -64,6 +64,12 @@ class PN_CPP_CLASS_EXTERN listener { /// @throw proton::error if this listener is not managed by a container. PN_CPP_EXTERN class container& container() const; + /// Set user data on this listener. + PN_CPP_EXTERN void user_data(void* user_data) const; + + /// Get user data from this listener. + PN_CPP_EXTERN void* user_data() const; + private: pn_listener_t* listener_; diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp index 568c6d087..60522c817 100644 --- a/cpp/include/proton/session.hpp +++ b/cpp/include/proton/session.hpp @@ -99,6 +99,12 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Return the receivers on this session. PN_CPP_EXTERN receiver_range receivers() const; + /// Set user data on this session. + PN_CPP_EXTERN void user_data(void* user_data) const; + + /// Get user data from this session. + PN_CPP_EXTERN void* user_data() const; + /// @cond INTERNAL friend class internal::factory<session>; friend class session_iterator; diff --git a/cpp/include/proton/transfer.hpp b/cpp/include/proton/transfer.hpp index 61657fe3b..cf0474a75 100644 --- a/cpp/include/proton/transfer.hpp +++ b/cpp/include/proton/transfer.hpp @@ -77,6 +77,12 @@ class transfer : public internal::object<pn_delivery_t> { /// Return true if the transfer has been settled. PN_CPP_EXTERN bool settled() const; + /// Set user data on this transfer. + PN_CPP_EXTERN void user_data(void* user_data) const; + + /// Get user data from this transfer. + PN_CPP_EXTERN void* user_data() const; + /// @cond INTERNAL friend class internal::factory<transfer>; /// @endcond diff --git a/cpp/src/connection.cpp b/cpp/src/connection.cpp index 6293779c8..128e71b0d 100644 --- a/cpp/src/connection.cpp +++ b/cpp/src/connection.cpp @@ -223,4 +223,14 @@ void connection::update_options(const connection_options& options) { cc.connection_options_->update(options); } +void connection::user_data(void* user_data) const { + connection_context& cc = connection_context::get(pn_object()); + cc.user_data_ = user_data; +} + +void* connection::user_data() const { + connection_context& cc = connection_context::get(pn_object()); + return cc.user_data_; +} + } // namespace proton diff --git a/cpp/src/context_test.cpp b/cpp/src/context_test.cpp new file mode 100644 index 000000000..f53a61cee --- /dev/null +++ b/cpp/src/context_test.cpp @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/delivery.h> +#include <proton/delivery.hpp> +#include <proton/link.hpp> +#include <proton/listen_handler.hpp> +#include <proton/listener.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/tracker.hpp> +#include <proton/types.h> +#include <proton/types.hpp> +#include <proton/value.hpp> + +#include "proton/error_condition.hpp" +#include "proton/receiver_options.hpp" +#include "proton/transport.hpp" +#include "proton/work_queue.hpp" +#include "test_bits.hpp" + +#include <condition_variable> +#include <mutex> +#include <thread> + +namespace { +std::mutex m; +std::condition_variable cv; +bool listener_ready = false; +int listener_port; +} // namespace + +class test_server : public proton::messaging_handler { + private: + class listener_ready_handler : public proton::listen_handler { + void on_open(proton::listener &l) override { + { + std::lock_guard<std::mutex> lk(m); + listener_port = l.port(); + listener_ready = true; + } + cv.notify_one(); + } + }; + + std::string url; + proton::listener listener; + listener_ready_handler listen_handler; + + public: + test_server (const std::string &s) : url(s) {} + + void on_container_start(proton::container &c) override { + listener = c.listen(url, listen_handler); + + std::string data = "listener-user-data"; + // Set user context 'data' on listener. + listener.user_data(&data); + ASSERT_EQUAL(&data, listener.user_data()); + } + + void on_message(proton::delivery &d, proton::message &msg) override { + std::string data = "delivery-user-data"; + // Set user context 'data' on delivery. + d.user_data(&data); + ASSERT_EQUAL(&data, d.user_data()); + + d.receiver().close(); + d.connection().close(); + listener.stop(); + } +}; + +class test_client : public proton::messaging_handler { + private: + std::string url; + proton::sender sender; + + public: + test_client (const std::string &s) : url(s) {} + + void on_container_start(proton::container &c) override { + proton::connection_options co; + sender = c.open_sender(url, co); + } + + void on_connection_open(proton::connection& c) override { + // Get default session + proton::session s = c.default_session(); + + std::string data_ssn = "session-user-data"; + // Set user context 'data' on default session. + s.user_data(&data_ssn); + ASSERT_EQUAL(&data_ssn, s.user_data()); + + std::string data_con = "connection-user-data"; + // Set user context 'data' on current connection. + c.user_data(&data_con); + ASSERT_EQUAL(&data_con, c.user_data()); + } + + void on_sendable(proton::sender &s) override { + proton::message msg; + msg.body("message"); + proton::tracker t = s.send(msg); + + std::string data = "sender-user-data"; + // Set user context 'data' on sender. + s.user_data(&data); + ASSERT_EQUAL(&data, s.user_data()); + + s.connection().close(); + } + + void on_tracker_accept(proton::tracker &t) override { + std::string data = "tracker-user-data"; + // Set user context 'data' on tracker. + t.user_data(&data); + ASSERT_EQUAL(&data, t.user_data()); + } +}; + +int test_user_context() { + + std::string recv_address("127.0.0.1:0/test"); + test_server recv(recv_address); + proton::container c(recv); + std::thread thread_recv([&c]() -> void { c.run(); }); + + // wait until listener is ready + std::unique_lock<std::mutex> lk(m); + cv.wait(lk, [] { return listener_ready; }); + + std::string send_address = + "127.0.0.1:" + std::to_string(listener_port) + "/test"; + test_client send(send_address); + proton::container(send).run(); + thread_recv.join(); + + return 0; +} + +int main(int argc, char **argv) { + int failed = 0; + RUN_ARGV_TEST(failed, test_user_context()); + return failed; +} diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp index 9dc13dbf0..4f85ace93 100644 --- a/cpp/src/contexts.cpp +++ b/cpp/src/contexts.cpp @@ -30,10 +30,11 @@ #include "proton/reconnect_options.hpp" #include <proton/connection.h> -#include <proton/object.h> +#include <proton/delivery.h> #include <proton/link.h> #include <proton/listener.h> #include <proton/message.h> +#include <proton/object.h> #include <proton/session.h> #include <typeinfo> @@ -49,6 +50,7 @@ PN_HANDLE(CONNECTION_CONTEXT) PN_HANDLE(LISTENER_CONTEXT) PN_HANDLE(SESSION_CONTEXT) PN_HANDLE(LINK_CONTEXT) +PN_HANDLE(TRANSFER_CONTEXT) template <class T> T* get_context(pn_record_t* record, pn_handle_t handle) { @@ -89,4 +91,8 @@ session_context& session_context::get(pn_session_t* s) { return ref<session_context>(id(pn_session_attachments(s), SESSION_CONTEXT)); } +transfer_context& transfer_context::get(pn_delivery_t* s) { + return ref<transfer_context>(id(pn_delivery_attachments(s), TRANSFER_CONTEXT)); +} + } diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index 4694ca969..02ea76083 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -103,6 +103,7 @@ class connection_context : public context { listener_context* listener_context_; work_queue work_queue_; std::string active_url_; + void* user_data_; }; class reconnect_options_base; @@ -127,6 +128,7 @@ class listener_context : public context { listen_handler* listen_handler_; std::unique_ptr<const connection_options> connection_options_; + void* user_data_; }; class link_context : public context { @@ -140,6 +142,7 @@ class link_context : public context { bool auto_accept; bool auto_settle; bool draining; + void* user_data_; }; class session_context : public context { @@ -148,6 +151,16 @@ class session_context : public context { static session_context& get(pn_session_t* s); messaging_handler* handler; + void* user_data_; +}; + +class transfer_context : public context { + public: + transfer_context() : handler(0) {} + static transfer_context& get(pn_delivery_t* s); + + messaging_handler* handler; + void* user_data_; }; } diff --git a/cpp/src/link.cpp b/cpp/src/link.cpp index c94fb4468..c6f58f48d 100644 --- a/cpp/src/link.cpp +++ b/cpp/src/link.cpp @@ -95,4 +95,17 @@ std::map<symbol, value> link::properties() const { error_condition link::error() const { return make_wrapper(pn_link_remote_condition(pn_object())); } + +void link::user_data(void* user_data) const { + pn_link_t* lnk = pn_object(); + link_context& lctx = link_context::get(lnk); + lctx.user_data_ = user_data; +} + +void* link::user_data() const { + pn_link_t* lnk = pn_object(); + link_context& lctx = link_context::get(lnk); + return lctx.user_data_; +} + } diff --git a/cpp/src/listener.cpp b/cpp/src/listener.cpp index 931600aec..91d0350d7 100644 --- a/cpp/src/listener.cpp +++ b/cpp/src/listener.cpp @@ -60,6 +60,16 @@ class container& listener::container() const { return *reinterpret_cast<class container*>(c); } +void listener::user_data(void* user_data) const { + listener_context& lc = listener_context::get(listener_); + lc.user_data_ = user_data; +} + +void* listener::user_data() const { + listener_context& lc = listener_context::get(listener_); + return lc.user_data_; +} + // Listen handler listen_handler::~listen_handler() = default; void listen_handler::on_open(listener&) {} diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp index 7ba809b82..b8f777a00 100644 --- a/cpp/src/session.cpp +++ b/cpp/src/session.cpp @@ -136,4 +136,16 @@ session_iterator session_iterator::operator++() { return *this; } +void session::user_data(void* user_data) const { + pn_session_t* ssn = pn_object(); + session_context& sctx = session_context::get(ssn); + sctx.user_data_ = user_data; +} + +void* session::user_data() const { + pn_session_t* ssn = pn_object(); + session_context& sctx = session_context::get(ssn); + return sctx.user_data_; +} + } // namespace proton diff --git a/cpp/src/transfer.cpp b/cpp/src/transfer.cpp index a024abf61..063254267 100644 --- a/cpp/src/transfer.cpp +++ b/cpp/src/transfer.cpp @@ -49,4 +49,15 @@ enum transfer::state transfer::state() const { return static_cast<enum state>(pn std::string to_string(enum transfer::state s) { return pn_disposition_type_name(s); } std::ostream& operator<<(std::ostream& o, const enum transfer::state s) { return o << to_string(s); } + +void transfer::user_data(void* user_data) const { + transfer_context& cc = transfer_context::get(pn_object()); + cc.user_data_ = user_data; +} + +void* transfer::user_data() const { + transfer_context& cc = transfer_context::get(pn_object()); + return cc.user_data_; +} + } diff --git a/cpp/tests.cmake b/cpp/tests.cmake index ee43b3c6e..fd515eabf 100644 --- a/cpp/tests.cmake +++ b/cpp/tests.cmake @@ -62,6 +62,7 @@ add_cpp_test(reconnect_test) add_cpp_test(link_test) add_cpp_test(credit_test) add_cpp_test(delivery_test) +add_cpp_test(context_test) if (ENABLE_JSONCPP) add_cpp_test(connect_config_test) target_link_libraries(connect_config_test qpid-proton-core) # For pn_sasl_enabled --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org