PROTON-865: sync_request_response
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8e1757eb Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8e1757eb Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8e1757eb Branch: refs/heads/cjansen-cpp-client Commit: 8e1757ebcf200d1475ec927a0d8e7eb7e2a9e06e Parents: 4339b1c Author: Clifford Jansen <[email protected]> Authored: Wed Jul 29 16:54:42 2015 -0700 Committer: Clifford Jansen <[email protected]> Committed: Wed Jul 29 16:54:42 2015 -0700 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 2 + examples/cpp/example_test.py | 40 +++++++++ examples/cpp/options.hpp | 2 +- examples/cpp/server.cpp | 95 ++++++++++++++++++++ examples/cpp/simple_recv.cpp | 2 +- examples/cpp/sync_client.cpp | 65 ++++++++++++++ proton-c/bindings/cpp/CMakeLists.txt | 1 + .../cpp/include/proton/blocking_connection.hpp | 5 +- .../cpp/include/proton/blocking_receiver.hpp | 16 +++- proton-c/bindings/cpp/include/proton/link.hpp | 2 +- .../include/proton/sync_request_response.hpp | 58 ++++++++++++ .../bindings/cpp/src/blocking_connection.cpp | 23 +++-- .../cpp/src/blocking_connection_impl.cpp | 4 + .../cpp/src/blocking_connection_impl.hpp | 1 + proton-c/bindings/cpp/src/blocking_receiver.cpp | 54 +++++++++-- .../bindings/cpp/src/sync_request_response.cpp | 77 ++++++++++++++++ 16 files changed, 423 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index fde4eee..f32c38a 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -30,6 +30,8 @@ foreach(example simple_send direct_recv direct_send + sync_client + server encode_decode) add_executable(${example} ${example}.cpp) target_link_libraries(${example} qpid-proton-cpp) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/example_test.py ---------------------------------------------------------------------- diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py index 8dd4341..38e3c2c 100644 --- a/examples/cpp/example_test.py +++ b/examples/cpp/example_test.py @@ -102,6 +102,31 @@ class Broker(object): except Exception as e: raise Exception("Error running %s: %s", cmd, e) +class Server(object): + """Run the test server""" + + @classmethod + def get(cls, addr): + if not hasattr(cls, "_server"): + cls._server = Server(addr) + return cls._server + + @classmethod + def stop(cls): + if cls.get(None) and cls._server.process: + cls._server.process.kill() + cls._server = None + + def __init__(self, addr): + self.addr = addr + cmd = [exe_name("server"), "-a", self.addr] + try: + self.process = Popen(cmd, stdout=NULL, stderr=NULL) + time.sleep(0.3) + + except Exception as e: + raise Exception("Error running %s: %s", cmd, e) + class ExampleTest(unittest.TestCase): """Run the examples, verify they behave as expected.""" @@ -161,6 +186,21 @@ class ExampleTest(unittest.TestCase): recv_expect += "".join(['{"sequence"=%s}\n' % (i+1) for i in range(100)]) self.assertEqual(recv_expect, verify(recv)) + def test_sync_request_response(self): + """Start server first, then run sync_client""" + b = Broker.get() + s = Server.get(b.addr) + expect = """ +"Twas brillig, and the slithy toves" => "TWAS BRILLIG, AND THE SLITHY TOVES" +"Did gire and gymble in the wabe." => "DID GIRE AND GYMBLE IN THE WABE." +"All mimsy were the borogroves," => "ALL MIMSY WERE THE BOROGROVES," +"And the mome raths outgrabe." => "AND THE MOME RATHS OUTGRABE." +""" + sc = "\n" + sc += execute("sync_client", "-a", b.addr) + self.assertEqual(expect, sc) + Server.stop() + def test_encode_decode(self): expect=""" == Simple values: int, string, bool http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/options.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/options.hpp b/examples/cpp/options.hpp index 5378507..bd477b5 100644 --- a/examples/cpp/options.hpp +++ b/examples/cpp/options.hpp @@ -125,7 +125,7 @@ class options { if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) { set_value(long_, arg.substr(long_.size()+1)); return true; - } + } return false; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/server.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp new file mode 100644 index 0000000..78b78d3 --- /dev/null +++ b/examples/cpp/server.cpp @@ -0,0 +1,95 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include "proton/container.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/url.hpp" + +#include <iostream> +#include <map> +#include <string> + +class server : public proton::messaging_handler { + private: + typedef std::map<std::string, proton::sender> sender_map; + proton::url url; + proton::connection connection; + sender_map senders; + + public: + + server(const proton::url &u) : url(u) {} + + void on_start(proton::event &e) { + connection = e.container().connect(url); + e.container().create_receiver(connection, url.path()); + std::cout << "server listening on " << url << std::endl; + } + + std::string to_upper(const std::string &s) { + std::string uc(s); + size_t l = uc.size(); + for (size_t i=0; i<l; i++) uc[i] = std::toupper(uc[i]); + return uc; + } + + // TODO: on_connection_opened() and ANONYMOUS-RELAY + + void on_message(proton::event &e) { + proton::sender sender; + proton::message msg = e.message(); + std::cout << "Received " << msg.body() << std::endl; + std::string sender_id = msg.reply_to(); + sender_map::iterator it = senders.find(sender_id); + if (it == senders.end()) { + sender = e.container().create_sender(connection, sender_id); + senders[sender_id] = sender; + } + else { + sender = it->second; + } + proton::message reply; + reply.body(to_upper(msg.body().get<std::string>())); + reply.correlation_id(msg.correlation_id()); + reply.address(sender_id); + sender.send(reply); + } +}; + +int main(int argc, char **argv) { + // Command line options + proton::url url("amqp://127.0.0.1:5672/examples"); + options opts(argc, argv); + opts.add_value(url, 'a', "address", "listen on URL", "URL"); + try { + opts.parse(); + server server(url); + proton::container(server).run(); + return 0; + } catch (const bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/simple_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp index b2cec63..94cd398 100644 --- a/examples/cpp/simple_recv.cpp +++ b/examples/cpp/simple_recv.cpp @@ -77,7 +77,7 @@ int main(int argc, char **argv) { opts.parse(); simple_recv recv(address, message_count); proton::container(recv).run(); - return 0; + return 0; } catch (const bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/examples/cpp/sync_client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/sync_client.cpp b/examples/cpp/sync_client.cpp new file mode 100644 index 0000000..e141a3e --- /dev/null +++ b/examples/cpp/sync_client.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include "proton/container.hpp" +#include "proton/sync_request_response.hpp" +#include "proton/url.hpp" +#include "proton/value.hpp" +#include "proton/types.hpp" + +#include <iostream> +#include <string> + +int main(int argc, char **argv) { + // Command line options + proton::url url("127.0.0.1:5672/examples"); + uint64_t timeout(5000); + options opts(argc, argv); + opts.add_value(url, 'a', "address", "connect to URL", "URL"); + opts.add_value(timeout, 't', "timeout", "give up after this TIMEOUT (milliseconds)", "TIMEOUT"); + + std::string requests[] = { "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." }; + int requests_size=4; + + try { + opts.parse(); + proton::duration d(timeout); + proton::blocking_connection conn(url, d); + proton::sync_request_response client(conn, url.path()); + for (int i=0; i<requests_size; i++) { + proton::message request; + request.body(requests[i]); + proton::message response = client.call(request); + std::cout << request.body() << " => " << response.body() << std::endl; + } + return 0; + } catch (const bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index fcb0fb1..d1b1ebc 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -70,6 +70,7 @@ set(qpid-proton-cpp-source src/blocking_link.cpp src/blocking_sender.cpp src/blocking_receiver.cpp + src/sync_request_response.cpp src/contexts.cpp src/types.cpp ) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/blocking_connection.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp index b6f0499..cba5f60 100644 --- a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp +++ b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp @@ -56,9 +56,10 @@ class blocking_connection : public handle<blocking_connection_impl> PN_CPP_EXTERN blocking_sender create_sender(const std::string &address, handler *h=0); PN_CPP_EXTERN blocking_receiver create_receiver(const std::string &address, int credit = 0, - bool dynamic = false, std::string name = std::string()); + bool dynamic = false, handler *h=0, std::string name = std::string()); PN_CPP_EXTERN void wait(wait_condition &condition); - PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration timeout=duration::FOREVER); + PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg); + PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration timeout); PN_CPP_EXTERN duration timeout(); private: friend class private_impl_ref<blocking_connection>; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp index 319fd93..dbcc9d0 100644 --- a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp +++ b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp @@ -49,9 +49,21 @@ class blocking_receiver : public blocking_link PN_CPP_EXTERN void release(bool delivered = true); PN_CPP_EXTERN void settle(); PN_CPP_EXTERN void settle(delivery::state state); + PN_CPP_EXTERN void flow(int count); + /** Credit available on the receiver link */ + PN_CPP_EXTERN int credit(); + /** Local source of the receiver link */ + PN_CPP_EXTERN terminus source(); + /** Local target of the receiver link */ + PN_CPP_EXTERN terminus target(); + /** Remote source of the receiver link */ + PN_CPP_EXTERN terminus remote_source(); + /** Remote target of the receiver link */ + PN_CPP_EXTERN terminus remote_target(); + private: - blocking_receiver(blocking_connection &c, receiver &l, fetcher &f, int credit); - fetcher &fetcher_; + blocking_receiver(blocking_connection &c, receiver &l, fetcher *f, int credit); + fetcher *fetcher_; friend class blocking_connection; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/link.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp index feabfec..46b8894 100644 --- a/proton-c/bindings/cpp/include/proton/link.hpp +++ b/proton-c/bindings/cpp/include/proton/link.hpp @@ -64,7 +64,7 @@ class link : public endpoint, public proton_handle<pn_link_t> PN_CPP_EXTERN terminus target(); /** Remote source of the link */ PN_CPP_EXTERN terminus remote_source(); - /** Remote source of the link */ + /** Remote target of the link */ PN_CPP_EXTERN terminus remote_target(); /** Link name */ PN_CPP_EXTERN std::string name(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/include/proton/sync_request_response.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/sync_request_response.hpp b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp new file mode 100644 index 0000000..604af4b --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/sync_request_response.hpp @@ -0,0 +1,58 @@ +#ifndef PROTON_CPP_SYNCREQUESTRESPONSE_H +#define PROTON_CPP_SYNCREQUESTRESPONSE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/export.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/blocking_receiver.hpp" +#include "proton/blocking_sender.hpp" +#include "proton/wait_condition.hpp" +#include <string> + +struct pn_message_t; +struct pn_data_t; + +namespace proton { + +/// An implementation of the synchronous request-response pattern (aka RPC). +class sync_request_response : public messaging_handler +{ + public: + PN_CPP_EXTERN sync_request_response(blocking_connection &, const std::string address=std::string()); + /** Send a request message, wait for and return the response message. */ + PN_CPP_EXTERN message call(message &); + /** Return the dynamic address of our receiver. */ + PN_CPP_EXTERN std::string reply_to(); + /** Called when we receive a message for our receiver. */ + void on_message(event &e); + private: + blocking_connection connection_; + std::string address_; + blocking_sender sender_; + blocking_receiver receiver_; + message response_; + amqp_ulong correlation_id_; +}; + +} + +#endif /*!PROTON_CPP_SYNCREQUESTRESPONSE_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection.cpp b/proton-c/bindings/cpp/src/blocking_connection.cpp index dd1aebb..c0c1477 100644 --- a/proton-c/bindings/cpp/src/blocking_connection.cpp +++ b/proton-c/bindings/cpp/src/blocking_connection.cpp @@ -50,6 +50,9 @@ blocking_connection::blocking_connection(const proton::url &url, duration d, ssl void blocking_connection::close() { impl_->close(); } void blocking_connection::wait(wait_condition &cond) { return impl_->wait(cond); } +void blocking_connection::wait(wait_condition &cond, std::string &msg) { + return impl_->wait(cond, msg); +} void blocking_connection::wait(wait_condition &cond, std::string &msg, duration timeout) { return impl_->wait(cond, msg, timeout); } @@ -61,18 +64,22 @@ blocking_sender blocking_connection::create_sender(const std::string &address, h namespace { struct fetcher_guard{ - fetcher_guard(fetcher &f) : fetcher_(f) { fetcher_.incref(); } - ~fetcher_guard() { fetcher_.decref(); } - fetcher& fetcher_; + fetcher_guard(fetcher *f) : fetcher_(f) { if (fetcher_) fetcher_->incref(); } + ~fetcher_guard() { if (fetcher_) fetcher_->decref(); } + fetcher* fetcher_; }; } blocking_receiver blocking_connection::create_receiver(const std::string &address, int credit, - bool dynamic, std::string name) { - fetcher *f = new fetcher(*this, credit); - fetcher_guard fg(*f); - receiver receiver = impl_->container_.create_receiver(impl_->connection_, address, dynamic, f); - blocking_receiver brcv(*this, receiver, *f, credit); + bool dynamic, handler *handler, std::string name) { + fetcher *f = NULL; + if (!handler) { + f = new fetcher(*this, credit); + handler = f; + } + fetcher_guard fg(f); + receiver receiver = impl_->container_.create_receiver(impl_->connection_, address, dynamic, handler); + blocking_receiver brcv(*this, receiver, f, credit); return brcv; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_connection_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp index 26d0e78..0e78541 100644 --- a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp +++ b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp @@ -88,6 +88,10 @@ void blocking_connection_impl::wait(wait_condition &condition) { wait(condition, empty, timeout_); } +void blocking_connection_impl::wait(wait_condition &condition, std::string &msg) { + wait(condition, msg, timeout_); +} + void blocking_connection_impl::wait(wait_condition &condition, std::string &msg, duration wait_timeout) { if (wait_timeout == duration::FOREVER) { while (!condition.achieved()) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_connection_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp index 7e8c031..11ff4fd 100644 --- a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp +++ b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp @@ -42,6 +42,7 @@ class ssl_domain; PN_CPP_EXTERN ~blocking_connection_impl(); PN_CPP_EXTERN void close(); PN_CPP_EXTERN void wait(wait_condition &condition); + PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg); PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration timeout); PN_CPP_EXTERN pn_connection_t *pn_blocking_connection(); duration timeout() { return timeout_; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/blocking_receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp b/proton-c/bindings/cpp/src/blocking_receiver.cpp index 042eb29..3ca53c0 100644 --- a/proton-c/bindings/cpp/src/blocking_receiver.cpp +++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp @@ -39,11 +39,11 @@ struct fetcher_has_message : public wait_condition { } // namespace -blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, fetcher &f, int credit) +blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, fetcher *f, int credit) : blocking_link(&c, l.pn_link()), fetcher_(f) { std::string sa = link_.source().address(); std::string rsa = link_.remote_source().address(); - if (sa.empty() || sa.compare(rsa) != 0) { + if (!sa.empty() && sa.compare(rsa) != 0) { wait_for_closed(); link_.close(); std::string txt = "Failed to open receiver " + link_.name() + ", source does not match"; @@ -51,30 +51,38 @@ blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, fetche } if (credit) pn_link_flow(link_.pn_link(), credit); - fetcher_.incref(); + if (fetcher_) + fetcher_->incref(); } blocking_receiver::blocking_receiver(const blocking_receiver& r) : blocking_link(r), fetcher_(r.fetcher_) { - fetcher_.incref(); + if (fetcher_) + fetcher_->incref(); } blocking_receiver& blocking_receiver::operator=(const blocking_receiver& r) { if (this == &r) return *this; fetcher_ = r.fetcher_; - fetcher_.incref(); + if (fetcher_) + fetcher_->incref(); return *this; } -blocking_receiver::~blocking_receiver() { fetcher_.decref(); } +blocking_receiver::~blocking_receiver() { + if (fetcher_) + fetcher_->decref(); +} message blocking_receiver::receive(duration timeout) { + if (!fetcher_) + throw error(MSG("Can't call receive on this receiver as a handler was provided")); receiver rcv = link_; if (!rcv.credit()) rcv.flow(1); std::string txt = "Receiving on receiver " + link_.name(); - fetcher_has_message cond(fetcher_); + fetcher_has_message cond(*fetcher_); connection_.wait(cond, txt, timeout); - return fetcher_.pop(); + return fetcher_->pop(); } message blocking_receiver::receive() { @@ -98,7 +106,35 @@ void blocking_receiver::release(bool delivered) { } void blocking_receiver::settle(delivery::state state = delivery::NONE) { - fetcher_.settle(state); + if (!fetcher_) + throw error(MSG("Can't call accept/reject etc on this receiver as a handler was provided")); + fetcher_->settle(state); +} + +void blocking_receiver::flow(int count) { + receiver rcv(link_); + rcv.flow(count); +} + +int blocking_receiver::credit() { + return link_.credit(); +} + +terminus blocking_receiver::source() { + return link_.source(); } +terminus blocking_receiver::target() { + return link_.target(); +} + +terminus blocking_receiver::remote_source() { + return link_.remote_source(); +} + +terminus blocking_receiver::remote_target() { + return link_.remote_target(); +} + + } // namespace http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e1757eb/proton-c/bindings/cpp/src/sync_request_response.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sync_request_response.cpp b/proton-c/bindings/cpp/src/sync_request_response.cpp new file mode 100644 index 0000000..eebd62d --- /dev/null +++ b/proton-c/bindings/cpp/src/sync_request_response.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/sync_request_response.hpp" +#include "proton/event.hpp" +#include "proton/error.hpp" +#include "msg.hpp" + +namespace proton { + +namespace { +amqp_ulong global_correlation_id = 0; +message null_message; + +struct response_received : public wait_condition { + response_received(message &m, amqp_ulong id) : message_(m), id_(id) {} + bool achieved() { return message_ && message_.correlation_id() == id_; } + message &message_; + value id_; +}; + +} + +sync_request_response::sync_request_response(blocking_connection &conn, const std::string addr): + connection_(conn), address_(addr), + sender_(connection_.create_sender(addr)), + receiver_(connection_.create_receiver("", 1, true, this)), // credit=1, dynamic=true + response_(null_message) +{ +} + +message sync_request_response::call(message &request) { + if (address_.empty() && request.address().empty()) + throw error(MSG("Request message has no address: " << request)); + // TODO: thread safe increment. + correlation_id_ = global_correlation_id++; + request.correlation_id(value(correlation_id_)); + request.reply_to(this->reply_to()); + sender_.send(request); + std::string txt("Waiting for response"); + response_received cond(response_, correlation_id_); + connection_.wait(cond, txt); + message resp = response_; + response_ = null_message; + receiver_.flow(1); + return resp; +} + +std::string sync_request_response::reply_to() { + return receiver_.remote_source().address(); +} + +void sync_request_response::on_message(event &e) { + response_ = e.message(); + // Wake up enclosing blocking_connection.wait() to handle the message + e.container().yield(); +} + + +} // namespace --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
