PROTON-865: Message properties, Acking and Delivery
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3c0c90f7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3c0c90f7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3c0c90f7 Branch: refs/heads/cjansen-cpp-client Commit: 3c0c90f74c053657d04cd7ba6f357f851c101999 Parents: 648f7b3 Author: Clifford Jansen <[email protected]> Authored: Thu May 7 07:06:43 2015 -0700 Committer: Alan Conway <[email protected]> Committed: Tue Jun 2 14:46:14 2015 -0400 ---------------------------------------------------------------------- proton-c/bindings/cpp/CMakeLists.txt | 2 + .../bindings/cpp/include/proton/cpp/Acking.h | 44 ++++ .../bindings/cpp/include/proton/cpp/Delivery.h | 61 +++++ .../bindings/cpp/include/proton/cpp/Message.h | 44 ++++ .../cpp/include/proton/cpp/MessagingAdapter.h | 3 +- .../cpp/include/proton/cpp/MessagingHandler.h | 15 +- proton-c/bindings/cpp/src/Acking.cpp | 49 ++++ proton-c/bindings/cpp/src/ContainerImpl.cpp | 5 +- proton-c/bindings/cpp/src/ContainerImpl.h | 2 +- proton-c/bindings/cpp/src/Delivery.cpp | 57 +++++ proton-c/bindings/cpp/src/Message.cpp | 234 ++++++++++++++++++- proton-c/bindings/cpp/src/MessagingAdapter.cpp | 15 +- proton-c/bindings/cpp/src/MessagingHandler.cpp | 9 +- 13 files changed, 521 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index 68baad7..ba35cb1 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -48,6 +48,8 @@ set (qpid-proton-cpp-core src/Receiver.cpp src/Sender.cpp src/Session.cpp + src/Delivery.cpp + src/Acking.cpp src/Transport.cpp src/Logger.cpp src/contexts.cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/include/proton/cpp/Acking.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Acking.h b/proton-c/bindings/cpp/include/proton/cpp/Acking.h new file mode 100644 index 0000000..d40d7d4 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/Acking.h @@ -0,0 +1,44 @@ +#ifndef PROTON_CPP_ACKING_H +#define PROTON_CPP_ACKING_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/Delivery.h" + +namespace proton { +namespace reactor { + + +class Acking +{ + public: + PROTON_CPP_EXTERN virtual void accept(Delivery &d); + PROTON_CPP_EXTERN virtual void reject(Delivery &d); + PROTON_CPP_EXTERN virtual void release(Delivery &d, bool delivered=true); + PROTON_CPP_EXTERN virtual void settle(Delivery &d, Delivery::state s = Delivery::REJECTED); +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_ACKING_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/include/proton/cpp/Delivery.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Delivery.h b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h new file mode 100644 index 0000000..a1965f6 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/Delivery.h @@ -0,0 +1,61 @@ +#ifndef PROTON_CPP_DELIVERY_H +#define PROTON_CPP_DELIVERY_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/Link.h" + +#include "ProtonImplRef.h" +#include "proton/disposition.h" + +namespace proton { +namespace reactor { + +class Delivery : public ProtonHandle<pn_delivery_t> +{ + public: + + enum state { + NONE = 0, + RECEIVED = PN_RECEIVED, + ACCEPTED = PN_ACCEPTED, + REJECTED = PN_REJECTED, + RELEASED = PN_RELEASED, + MODIFIED = PN_MODIFIED + }; // AMQP spec 3.4 Delivery State + + PROTON_CPP_EXTERN Delivery(pn_delivery_t *d); + PROTON_CPP_EXTERN Delivery(); + PROTON_CPP_EXTERN ~Delivery(); + PROTON_CPP_EXTERN Delivery(const Delivery&); + PROTON_CPP_EXTERN Delivery& operator=(const Delivery&); + PROTON_CPP_EXTERN bool settled(); + PROTON_CPP_EXTERN void settle(); + PROTON_CPP_EXTERN pn_delivery_t *getPnDelivery(); + private: + friend class ProtonImplRef<Delivery>; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_DELIVERY_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/include/proton/cpp/Message.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Message.h b/proton-c/bindings/cpp/include/proton/cpp/Message.h index ae29ca2..590fdd8 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Message.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Message.h @@ -43,8 +43,52 @@ class Message : public ProtonHandle<pn_message_t> PROTON_CPP_EXTERN void setId(uint64_t id); PROTON_CPP_EXTERN uint64_t getId(); + PROTON_CPP_EXTERN void setId(const std::string &id); + PROTON_CPP_EXTERN std::string getStringId(); + PROTON_CPP_EXTERN void setId(const char *p, size_t len); + PROTON_CPP_EXTERN size_t getId(const char **p); + // TODO: UUID version PROTON_CPP_EXTERN pn_type_t getIdType(); + PROTON_CPP_EXTERN void setUserId(const std::string &id); + PROTON_CPP_EXTERN std::string getUserId(); + + PROTON_CPP_EXTERN void setAddress(const std::string &addr); + PROTON_CPP_EXTERN std::string getAddress(); + + PROTON_CPP_EXTERN void setSubject(const std::string &s); + PROTON_CPP_EXTERN std::string getSubject(); + + PROTON_CPP_EXTERN void setReplyTo(const std::string &s); + PROTON_CPP_EXTERN std::string getReplyTo(); + + PROTON_CPP_EXTERN void setCorrelationId(uint64_t id); + PROTON_CPP_EXTERN uint64_t getCorrelationId(); + PROTON_CPP_EXTERN void setCorrelationId(const std::string &id); + PROTON_CPP_EXTERN std::string getStringCorrelationId(); + PROTON_CPP_EXTERN void setCorrelationId(const char *p, size_t len); + PROTON_CPP_EXTERN size_t getCorrelationId(const char **p); + // TODO: UUID version + PROTON_CPP_EXTERN pn_type_t getCorrelationIdType(); + + PROTON_CPP_EXTERN void setContentType(const std::string &s); + PROTON_CPP_EXTERN std::string getContentType(); + + PROTON_CPP_EXTERN void setContentEncoding(const std::string &s); + PROTON_CPP_EXTERN std::string getContentEncoding(); + + PROTON_CPP_EXTERN void setExpiry(pn_timestamp_t t); + PROTON_CPP_EXTERN pn_timestamp_t getExpiry(); + + PROTON_CPP_EXTERN void setCreationTime(pn_timestamp_t t); + PROTON_CPP_EXTERN pn_timestamp_t getCreationTime(); + + PROTON_CPP_EXTERN void setGroupId(const std::string &s); + PROTON_CPP_EXTERN std::string getGroupId(); + + PROTON_CPP_EXTERN void setReplyToGroupId(const std::string &s); + PROTON_CPP_EXTERN std::string getReplyToGroupId(); + PROTON_CPP_EXTERN void setBody(const std::string &data); PROTON_CPP_EXTERN std::string getBody(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h index ac8b483..36a92e4 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h +++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingAdapter.h @@ -70,8 +70,7 @@ class MessagingAdapter : public MessagingHandler PROTON_CPP_EXTERN virtual void onLinkOpening(Event &e); PROTON_CPP_EXTERN virtual void onTransportTailClosed(Event &e); private: - MessagingHandler &delegate; // The actual MessagingHandler - pn_handler_t *handshaker; + MessagingHandler &delegate; // The handler for generated MessagingEvent's bool autoSettle; bool autoAccept; bool peerCloseIsError; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h index 51f679a..4f00681 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h +++ b/proton-c/bindings/cpp/include/proton/cpp/MessagingHandler.h @@ -23,6 +23,7 @@ */ #include "proton/cpp/ProtonHandler.h" +#include "proton/cpp/Acking.h" #include "proton/event.h" namespace proton { @@ -30,11 +31,11 @@ namespace reactor { class Event; -class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler +class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler , public Acking { public: - PROTON_CPP_EXTERN MessagingHandler(); -//ZZZ PROTON_CPP_EXTERN MessagingHandler(int prefetch=10, bool autoAccept=true, autoSettle=true, peerCloseIsError=false); + PROTON_CPP_EXTERN MessagingHandler(int prefetch=10, bool autoAccept=true, bool autoSettle=true, + bool peerCloseIsError=false); virtual ~MessagingHandler(); virtual void onAbort(Event &e); @@ -74,6 +75,14 @@ class PROTON_CPP_EXTERN MessagingHandler : public ProtonHandler virtual void onTransactionCommitted(Event &e); virtual void onTransactionDeclared(Event &e); virtual void onTransportClosed(Event &e); + protected: + int prefetch; + bool autoSettle; + bool autoAccept; + bool peerCloseIsError; + private: + friend class ContainerImpl; + friend class MessagingAdapter; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/src/Acking.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Acking.cpp b/proton-c/bindings/cpp/src/Acking.cpp new file mode 100644 index 0000000..62eca98 --- /dev/null +++ b/proton-c/bindings/cpp/src/Acking.cpp @@ -0,0 +1,49 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Acking.h" +#include "proton/delivery.h" + +namespace proton { +namespace reactor { + +void Acking::accept(Delivery &d) { + settle(d, Delivery::ACCEPTED); +} + +void Acking::reject(Delivery &d) { + settle(d, Delivery::REJECTED); +} + +void Acking::release(Delivery &d, bool delivered) { + if (delivered) + settle(d, Delivery::MODIFIED); + else + settle(d, Delivery::RELEASED); +} + +void Acking::settle(Delivery &d, Delivery::state state) { + if (state) + pn_delivery_update(d.getPnDelivery(), state); + d.settle(); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/src/ContainerImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp index df8c716..d1339f0 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.cpp +++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp @@ -308,16 +308,15 @@ void ContainerImpl::run() { // Set our context on the reactor setContainerContext(reactor, this); - int prefetch = 10; // TODO: configurable + int prefetch = messagingHandler.prefetch; Handler *flowController = 0; - // Set the reactor's main/default handler (see note below) - MessagingAdapter messagingAdapter(messagingHandler); if (prefetch) { flowController = new CFlowController(prefetch); messagingHandler.addChildHandler(*flowController); } + MessagingAdapter messagingAdapter(messagingHandler); messagingHandler.addChildHandler(messagingAdapter); pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler); pn_reactor_set_handler(reactor, cppHandler); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/src/ContainerImpl.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h index 8a6faba..f7b5b9e 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.h +++ b/proton-c/bindings/cpp/src/ContainerImpl.h @@ -49,7 +49,7 @@ class ContainerImpl PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr); PROTON_CPP_EXTERN Sender createSender(std::string &url); PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr); - PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); //ZZZ + PROTON_CPP_EXTERN Receiver createReceiver(const std::string &url); PROTON_CPP_EXTERN Acceptor listen(const std::string &url); PROTON_CPP_EXTERN std::string getContainerId(); static void incref(ContainerImpl *); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/src/Delivery.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Delivery.cpp b/proton-c/bindings/cpp/src/Delivery.cpp new file mode 100644 index 0000000..990e394 --- /dev/null +++ b/proton-c/bindings/cpp/src/Delivery.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Delivery.h" +#include "proton/delivery.h" + +namespace proton { +namespace reactor { + +template class ProtonHandle<pn_delivery_t>; +typedef ProtonImplRef<Delivery> PI; + +Delivery::Delivery(pn_delivery_t *p) { + PI::ctor(*this, p); +} +Delivery::Delivery() { + PI::ctor(*this, 0); +} +Delivery::Delivery(const Delivery& c) : ProtonHandle<pn_delivery_t>() { + PI::copy(*this, c); +} +Delivery& Delivery::operator=(const Delivery& c) { + return PI::assign(*this, c); +} +Delivery::~Delivery() { + PI::dtor(*this); +} + +bool Delivery::settled() { + return pn_delivery_settled(impl); +} + +void Delivery::settle() { + pn_delivery_settle(impl); +} + +pn_delivery_t *Delivery::getPnDelivery() { return impl; } + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/src/Message.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Message.cpp b/proton-c/bindings/cpp/src/Message.cpp index ead6eb1..bdc8c0c 100644 --- a/proton-c/bindings/cpp/src/Message.cpp +++ b/proton-c/bindings/cpp/src/Message.cpp @@ -93,6 +93,45 @@ uint64_t Message::getId() { throw ProtonException(MSG("Message ID is not a ULONG")); } +void Message::setId(const std::string &id) { + confirm(impl); + pn_data_t *data = pn_message_id(impl); + pn_data_clear(data); + if (int err = pn_data_put_string(data, pn_bytes(id.size(), id.data()))) + throw ProtonException(MSG("setId error " << err)); +} + +std::string Message::getStringId() { + confirm(impl); + pn_data_t *data = pn_message_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_STRING) { + pn_bytes_t bytes = pn_data_get_string(data); + return (std::string(bytes.start, bytes.size)); + } + throw ProtonException(MSG("Message ID is not a string value")); +} + +void Message::setId(const char *p, size_t len) { + confirm(impl); + pn_data_t *data = pn_message_id(impl); + pn_data_clear(data); + if (int err = pn_data_put_binary(data, pn_bytes(len, p))) + throw ProtonException(MSG("setId error " << err)); +} + +size_t Message::getId(const char **p) { + confirm(impl); + pn_data_t *data = pn_message_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_BINARY) { + pn_bytes_t pnb = pn_data_get_binary(data); + *p = pnb.start; + return pnb.size; + } + throw ProtonException(MSG("Message ID is not a binary value")); +} + pn_type_t Message::getIdType() { confirm(impl); pn_data_t *data = pn_message_id(impl); @@ -113,6 +152,199 @@ pn_type_t Message::getIdType() { return PN_NULL; } +void Message::setUserId(const std::string &id) { + confirm(impl); + if (int err = pn_message_set_user_id(impl, pn_bytes(id.size(), id.data()))) + throw ProtonException(MSG("setUserId error " << err)); +} + +std::string Message::getUserId() { + confirm(impl); + pn_bytes_t bytes = pn_message_get_user_id(impl); + return (std::string(bytes.start, bytes.size)); +} + +void Message::setAddress(const std::string &addr) { + confirm(impl); + if (int err = pn_message_set_address(impl, addr.c_str())) + throw ProtonException(MSG("setAddress error " << err)); +} + +std::string Message::getAddress() { + confirm(impl); + const char* addr = pn_message_get_address(impl); + return addr ? std::string(addr) : std::string(); +} + +void Message::setSubject(const std::string &s) { + confirm(impl); + if (int err = pn_message_set_subject(impl, s.c_str())) + throw ProtonException(MSG("setSubject error " << err)); +} + +std::string Message::getSubject() { + confirm(impl); + const char* s = pn_message_get_subject(impl); + return s ? std::string(s) : std::string(); +} + +void Message::setReplyTo(const std::string &s) { + confirm(impl); + if (int err = pn_message_set_reply_to(impl, s.c_str())) + throw ProtonException(MSG("setReplyTo error " << err)); +} + +std::string Message::getReplyTo() { + confirm(impl); + const char* s = pn_message_get_reply_to(impl); + return s ? std::string(s) : std::string(); +} + +void Message::setCorrelationId(uint64_t id) { + confirm(impl); + pn_data_t *data = pn_message_correlation_id(impl); + pn_data_clear(data); + if (int err = pn_data_put_ulong(data, id)) + throw ProtonException(MSG("setCorrelationId error " << err)); +} + +uint64_t Message::getCorrelationId() { + confirm(impl); + pn_data_t *data = pn_message_correlation_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_ULONG) { + return pn_data_get_ulong(data); + } + throw ProtonException(MSG("Correlation ID is not a ULONG")); +} + +void Message::setCorrelationId(const std::string &id) { + confirm(impl); + pn_data_t *data = pn_message_correlation_id(impl); + pn_data_clear(data); + if (int err = pn_data_put_string(data, pn_bytes(id.size(), id.data()))) + throw ProtonException(MSG("setCorrelationId error " << err)); +} + +std::string Message::getStringCorrelationId() { + confirm(impl); + pn_data_t *data = pn_message_correlation_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_STRING) { + pn_bytes_t bytes = pn_data_get_string(data); + return (std::string(bytes.start, bytes.size)); + } + throw ProtonException(MSG("Message ID is not a string value")); +} + +void Message::setCorrelationId(const char *p, size_t len) { + confirm(impl); + pn_data_t *data = pn_message_correlation_id(impl); + pn_data_clear(data); + if (int err = pn_data_put_binary(data, pn_bytes(len, p))) + throw ProtonException(MSG("setCorrelationId error " << err)); +} + +size_t Message::getCorrelationId(const char **p) { + confirm(impl); + pn_data_t *data = pn_message_correlation_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data) && pn_data_type(data) == PN_BINARY) { + pn_bytes_t pnb = pn_data_get_binary(data); + *p = pnb.start; + return pnb.size; + } + throw ProtonException(MSG("Message ID is not a binary value")); +} + +pn_type_t Message::getCorrelationIdType() { + confirm(impl); + pn_data_t *data = pn_message_correlation_id(impl); + pn_data_rewind(data); + if (pn_data_size(data) == 1 && pn_data_next(data)) { + pn_type_t type = pn_data_type(data); + switch (type) { + case PN_ULONG: + case PN_STRING: + case PN_BINARY: + case PN_UUID: + return type; + break; + default: + break; + } + } + return PN_NULL; +} + +void Message::setContentType(const std::string &s) { + confirm(impl); + if (int err = pn_message_set_content_type(impl, s.c_str())) + throw ProtonException(MSG("setContentType error " << err)); +} + +std::string Message::getContentType() { + confirm(impl); + const char* s = pn_message_get_content_type(impl); + return s ? std::string(s) : std::string(); +} + +void Message::setContentEncoding(const std::string &s) { + confirm(impl); + if (int err = pn_message_set_content_encoding(impl, s.c_str())) + throw ProtonException(MSG("setContentEncoding error " << err)); +} + +std::string Message::getContentEncoding() { + confirm(impl); + const char* s = pn_message_get_content_encoding(impl); + return s ? std::string(s) : std::string(); +} + +void Message::setExpiry(pn_timestamp_t t) { + confirm(impl); + pn_message_set_expiry_time(impl, t); +} +pn_timestamp_t Message::getExpiry() { + confirm(impl); + return pn_message_get_expiry_time(impl); +} + +void Message::setCreationTime(pn_timestamp_t t) { + confirm(impl); + pn_message_set_creation_time(impl, t); +} +pn_timestamp_t Message::getCreationTime() { + confirm(impl); + return pn_message_get_creation_time(impl); +} + + +void Message::setGroupId(const std::string &s) { + confirm(impl); + if (int err = pn_message_set_group_id(impl, s.c_str())) + throw ProtonException(MSG("setGroupId error " << err)); +} + +std::string Message::getGroupId() { + confirm(impl); + const char* s = pn_message_get_group_id(impl); + return s ? std::string(s) : std::string(); +} + +void Message::setReplyToGroupId(const std::string &s) { + confirm(impl); + if (int err = pn_message_set_reply_to_group_id(impl, s.c_str())) + throw ProtonException(MSG("setReplyToGroupId error " << err)); +} + +std::string Message::getReplyToGroupId() { + confirm(impl); + const char* s = pn_message_get_reply_to_group_id(impl); + return s ? std::string(s) : std::string(); +} + + void Message::setBody(const std::string &buf) { confirm(impl); pn_data_t *body = pn_message_body(impl); @@ -127,7 +359,7 @@ void Message::getBody(std::string &str) { pn_data_rewind(body); if (pn_data_next(body) && pn_data_type(body) == PN_STRING) { - pn_bytes_t bytes= pn_data_get_string(body); + pn_bytes_t bytes = pn_data_get_string(body); if (!pn_data_next(body)) { // String data and nothing else str.resize(bytes.size); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/src/MessagingAdapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp index 1097305..f2916db 100644 --- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp +++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp @@ -33,13 +33,14 @@ namespace proton { namespace reactor { -MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()), - autoSettle(true), autoAccept(true), - peerCloseIsError(false) { -}; -MessagingAdapter::~MessagingAdapter(){ - pn_decref(handshaker); -}; +MessagingAdapter::MessagingAdapter(MessagingHandler &delegate_) : + autoSettle(delegate_.autoSettle), + autoAccept(delegate_.autoAccept), + peerCloseIsError(delegate_.peerCloseIsError), + delegate(delegate_) +{}; + +MessagingAdapter::~MessagingAdapter(){}; void MessagingAdapter::onReactorInit(Event &e) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3c0c90f7/proton-c/bindings/cpp/src/MessagingHandler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp index 6066b07..7a4a5cb 100644 --- a/proton-c/bindings/cpp/src/MessagingHandler.cpp +++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp @@ -19,12 +19,17 @@ * */ #include "proton/cpp/MessagingHandler.h" -#include "proton/cpp/Event.h" +#include "proton/cpp/ProtonEvent.h" +#include "proton/cpp/MessagingAdapter.h" +#include "proton/handlers.h" namespace proton { namespace reactor { -MessagingHandler::MessagingHandler(){}; +MessagingHandler::MessagingHandler(int prefetch0, bool autoAccept0, bool autoSettle0, bool peerCloseIsError0) : + prefetch(prefetch0), autoAccept(autoAccept0), autoSettle(autoSettle0), peerCloseIsError(peerCloseIsError0) +{} + MessagingHandler::~MessagingHandler(){}; void MessagingHandler::onAbort(Event &e) { onUnhandled(e); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
