http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.cpp b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.cpp new file mode 100644 index 0000000..56242ad --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.cpp @@ -0,0 +1,290 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/amqp_types_test/Receiver.hpp" + +#include <iostream> +#include <json/json.h> +#include "proton/connection.hpp" +#include <proton/default_container.hpp> +#include "proton/delivery.hpp" +#include "proton/receiver.hpp" +#include "proton/transport.hpp" +#include "qpidit/QpidItErrors.hpp" + +namespace qpidit +{ + namespace amqp_types_test + { + + Receiver::Receiver(const std::string& brokerUrl, + const std::string& amqpType, + uint32_t expected) : + _brokerUrl(brokerUrl), + _amqpType(amqpType), + _expected(expected), + _received(0UL), + _receivedValueList(Json::arrayValue) + {} + + Receiver::~Receiver() {} + + Json::Value& Receiver::getReceivedValueList() { + return _receivedValueList; + } + + void Receiver::on_container_start(proton::container &c) { + c.open_receiver(_brokerUrl); + } + + void Receiver::on_message(proton::delivery &d, proton::message &m) { + if (proton::get<uint64_t>(m.id()) < _received) return; // ignore duplicate + if (_received < _expected) { + if (_amqpType.compare("null") == 0) { + checkMessageType(m, proton::NULL_TYPE); + _receivedValueList.append("None"); + } else if (_amqpType.compare("boolean") == 0) { + checkMessageType(m, proton::BOOLEAN); + _receivedValueList.append(m.body().get<bool>() ? "True": "False"); + } else if (_amqpType.compare("ubyte") == 0) { + checkMessageType(m, proton::UBYTE); + _receivedValueList.append(toHexStr<uint8_t>(m.body().get<uint8_t>())); + } else if (_amqpType.compare("ushort") == 0) { + checkMessageType(m, proton::USHORT); + _receivedValueList.append(toHexStr<uint16_t>(m.body().get<uint16_t>())); + } else if (_amqpType.compare("uint") == 0) { + checkMessageType(m, proton::UINT); + _receivedValueList.append(toHexStr<uint32_t>(m.body().get<uint32_t>())); + } else if (_amqpType.compare("ulong") == 0) { + checkMessageType(m, proton::ULONG); + _receivedValueList.append(toHexStr<uint64_t>(m.body().get<uint64_t>())); + } else if (_amqpType.compare("byte") == 0) { + checkMessageType(m, proton::BYTE); + _receivedValueList.append(toHexStr<int8_t>(m.body().get<int8_t>())); + } else if (_amqpType.compare("short") == 0) { + checkMessageType(m, proton::SHORT); + _receivedValueList.append(toHexStr<int16_t>(m.body().get<int16_t>())); + } else if (_amqpType.compare("int") == 0) { + checkMessageType(m, proton::INT); + _receivedValueList.append(toHexStr<int32_t>(m.body().get<int32_t>())); + } else if (_amqpType.compare("long") == 0) { + checkMessageType(m, proton::LONG); + _receivedValueList.append(toHexStr<int64_t>(m.body().get<int64_t>())); + } else if (_amqpType.compare("float") == 0) { + checkMessageType(m, proton::FLOAT); + float f = m.body().get<float>(); + _receivedValueList.append(toHexStr<uint32_t>(*((uint32_t*)&f), true)); + } else if (_amqpType.compare("double") == 0) { + checkMessageType(m, proton::DOUBLE); + double d = m.body().get<double>(); + _receivedValueList.append(toHexStr<uint64_t>(*((uint64_t*)&d), true)); + } else if (_amqpType.compare("decimal32") == 0) { + checkMessageType(m, proton::DECIMAL32); + _receivedValueList.append(byteArrayToHexStr(m.body().get<proton::decimal32>())); + } else if (_amqpType.compare("decimal64") == 0) { + checkMessageType(m, proton::DECIMAL64); + _receivedValueList.append(byteArrayToHexStr(m.body().get<proton::decimal64>())); + } else if (_amqpType.compare("decimal128") == 0) { + checkMessageType(m, proton::DECIMAL128); + _receivedValueList.append(byteArrayToHexStr(m.body().get<proton::decimal128>())); + } else if (_amqpType.compare("char") == 0) { + checkMessageType(m, proton::CHAR); + wchar_t c = m.body().get<wchar_t>(); + std::stringstream oss; + if (c < 0x7f && std::iswprint(c)) { + oss << (char)c; + } else { + oss << "0x" << std::hex << c; + } + _receivedValueList.append(oss.str()); + } else if (_amqpType.compare("timestamp") == 0) { + checkMessageType(m, proton::TIMESTAMP); + std::ostringstream oss; + oss << "0x" << std::hex << m.body().get<proton::timestamp>().milliseconds(); + _receivedValueList.append(oss.str()); + } else if (_amqpType.compare("uuid") == 0) { + checkMessageType(m, proton::UUID); + std::ostringstream oss; + oss << m.body().get<proton::uuid>(); + _receivedValueList.append(oss.str()); + } else if (_amqpType.compare("binary") == 0) { + checkMessageType(m, proton::BINARY); + _receivedValueList.append(std::string(m.body().get<proton::binary>())); + } else if (_amqpType.compare("string") == 0) { + checkMessageType(m, proton::STRING); + _receivedValueList.append(m.body().get<std::string>()); + } else if (_amqpType.compare("symbol") == 0) { + checkMessageType(m, proton::SYMBOL); + _receivedValueList.append(m.body().get<proton::symbol>()); + } else if (_amqpType.compare("list") == 0) { + checkMessageType(m, proton::LIST); + Json::Value jsonList(Json::arrayValue); + _receivedValueList.append(getSequence(jsonList, m.body())); + } else if (_amqpType.compare("map") == 0) { + checkMessageType(m, proton::MAP); + Json::Value jsonMap(Json::objectValue); + _receivedValueList.append(getMap(jsonMap, m.body())); + } else if (_amqpType.compare("array") == 0) { + throw qpidit::UnsupportedAmqpTypeError(_amqpType); + } else { + throw qpidit::UnknownAmqpTypeError(_amqpType); + } + } + _received++; + if (_received >= _expected) { + d.receiver().close(); + d.connection().close(); + } + } + + void Receiver::on_connection_error(proton::connection &c) { + std::cerr << "AmqpReceiver::on_connection_error(): " << c.error() << std::endl; + } + + void Receiver::on_receiver_error(proton::receiver& r) { + std::cerr << "AmqpReceiver::on_receiver_error(): " << r.error() << std::endl; + } + + void Receiver::on_session_error(proton::session &s) { + std::cerr << "AmqpReceiver::on_session_error(): " << s.error() << std::endl; + } + + void Receiver::on_transport_error(proton::transport &t) { + std::cerr << "AmqpReceiver::on_transport_error(): " << t.error() << std::endl; + } + + void Receiver::on_error(const proton::error_condition &ec) { + std::cerr << "AmqpReceiver::on_error(): " << ec << std::endl; + } + + // protected + + //static + void Receiver::checkMessageType(const proton::message& msg, proton::type_id amqpType) { + if (msg.body().type() != amqpType) { + throw qpidit::IncorrectMessageBodyTypeError(amqpType, msg.body().type()); + } + } + + //static + Json::Value& Receiver::getMap(Json::Value& jsonMap, const proton::value& val) { + std::map<proton::value, proton::value> msgMap; + val.get(msgMap); + for (std::map<proton::value, proton::value>::const_iterator i = msgMap.begin(); i != msgMap.end(); ++i) { + switch (i->second.type()) { + case proton::LIST: + { + Json::Value jsonSubList(Json::arrayValue); + jsonMap[i->first.get<std::string>()] = getSequence(jsonSubList, i->second); + break; + } + case proton::MAP: + { + Json::Value jsonSubMap(Json::objectValue); + jsonMap[i->first.get<std::string>()] = getMap(jsonSubMap, i->second); + break; + } + case proton::ARRAY: + break; + case proton::STRING: + jsonMap[i->first.get<std::string>()] = Json::Value(i->second.get<std::string>()); + break; + default: + throw qpidit::IncorrectValueTypeError(i->second); + } + } + return jsonMap; + } + + //static + Json::Value& Receiver::getSequence(Json::Value& jsonList, const proton::value& val) { + std::vector<proton::value> msgList; + val.get(msgList); + for (std::vector<proton::value>::const_iterator i=msgList.begin(); i!=msgList.end(); ++i) { + switch ((*i).type()) { + case proton::LIST: + { + Json::Value jsonSubList(Json::arrayValue); + jsonList.append(getSequence(jsonSubList, *i)); + break; + } + case proton::MAP: + { + Json::Value jsonSubMap(Json::objectValue); + jsonList.append(getMap(jsonSubMap, *i)); + break; + } + case proton::ARRAY: + break; + case proton::STRING: + jsonList.append(Json::Value((*i).get<std::string>())); + break; + default: + throw qpidit::IncorrectValueTypeError(*i); + } + } + return jsonList; + } + + //static + std::string Receiver::stringToHexStr(const std::string& str) { + std::ostringstream oss; + oss << "0x" << std::hex; + for (std::string::const_iterator i=str.begin(); i!=str.end(); ++i) { + oss << std::setw(2) << std::setfill('0') << ((int)*i & 0xff); + } + return oss.str(); + } + + } /* namespace amqp_types_test */ +} /* namespace qpidit */ + + +/* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: AMQP type + * 4: Expected number of test values to receive + */ + +int main(int argc, char** argv) { + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + qpidit::amqp_types_test::Receiver receiver(oss.str(), argv[3], std::strtoul(argv[4], NULL, 0)); + proton::default_container(receiver).run(); + + std::cout << argv[3] << std::endl; + Json::FastWriter fw; + std::cout << fw.write(receiver.getReceivedValueList()); + } catch (const std::exception& e) { + std::cerr << "AmqpReceiver error: " << e.what() << std::endl; + exit(-1); + } + exit(0); +}
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.hpp b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.hpp new file mode 100644 index 0000000..5ed67be --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Receiver.hpp @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_AMQP_TYPES_TEST_RECEIVER_HPP_ +#define SRC_QPIDIT_AMQP_TYPES_TEST_RECEIVER_HPP_ + +#include <iomanip> +#include <json/value.h> +#include "proton/messaging_handler.hpp" +#include "proton/types.hpp" +#include <sstream> + +namespace qpidit +{ + namespace amqp_types_test + { + + class Receiver : public proton::messaging_handler + { + protected: + const std::string _brokerUrl; + const std::string _amqpType; + uint32_t _expected; + uint32_t _received; + Json::Value _receivedValueList; + public: + Receiver(const std::string& brokerUrl, const std::string& amqpType, uint32_t exptected); + virtual ~Receiver(); + Json::Value& getReceivedValueList(); + void on_container_start(proton::container &c); + void on_message(proton::delivery &d, proton::message &m); + + void on_connection_error(proton::connection &c); + void on_receiver_error(proton::receiver& r); + void on_session_error(proton::session &s); + void on_transport_error(proton::transport &t); + void on_error(const proton::error_condition &c); + protected: + static void checkMessageType(const proton::message& msg, proton::type_id msgType); + static Json::Value& getMap(Json::Value& jsonMap, const proton::value& val); + static Json::Value& getSequence(Json::Value& jsonList, const proton::value& val); + static std::string stringToHexStr(const std::string& str); + + // Format signed numbers in negative hex format, ie -0xNNNN, positive numbers in 0xNNNN format + template<typename T> static std::string toHexStr(T val, bool fillFlag = false) { + std::ostringstream oss; + bool neg = val < 0; + if (neg) val = -val; + oss << (neg ? "-" : "") << "0x" << std::hex; + if (fillFlag) { + oss << std::setw(sizeof(T)*2) << std::setfill('0'); + } + oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val); + return oss.str(); + } + + template<size_t N> static std::string byteArrayToHexStr(const proton::byte_array<N>& val) { + std::ostringstream oss; + oss << val; + return oss.str(); + } + }; + + } /* namespace amqp_types_test */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_AMQP_TYPES_TEST_RECEIVER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.cpp b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.cpp new file mode 100644 index 0000000..353bf46 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.cpp @@ -0,0 +1,368 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/amqp_types_test/Sender.hpp" + +#include <iostream> +#include <json/json.h> +#include "proton/connection.hpp" +#include "proton/default_container.hpp" +#include "proton/sender.hpp" +#include "proton/tracker.hpp" +#include "proton/transport.hpp" + +namespace qpidit +{ + namespace amqp_types_test + { + + Sender::Sender(const std::string& brokerUrl, + const std::string& amqpType, + const Json::Value& testValues) : + _brokerUrl(brokerUrl), + _amqpType(amqpType), + _testValues(testValues), + _msgsSent(0), + _msgsConfirmed(0), + _totalMsgs(testValues.size()) + {} + + Sender::~Sender() {} + + void Sender::on_container_start(proton::container &c) { + c.open_sender(_brokerUrl); + } + + void Sender::on_sendable(proton::sender &s) { + if (_totalMsgs == 0) { + s.connection().close(); + } else if (_msgsSent == 0) { + for (Json::Value::const_iterator i=_testValues.begin(); i!=_testValues.end(); ++i) { + if (s.credit()) { + proton::message msg; + s.send(setMessage(msg, *i)); + _msgsSent++; + } + } + } else { + // do nothing + } + } + + void Sender::on_tracker_accept(proton::tracker &t) { + _msgsConfirmed++; + if (_msgsConfirmed == _totalMsgs) { + t.connection().close(); + } + } + + void Sender::on_transport_close(proton::transport &t) { + _msgsSent = _msgsConfirmed; + } + + void Sender::on_connection_error(proton::connection &c) { + std::cerr << "AmqpSender::on_connection_error(): " << c.error() << std::endl; + } + + void Sender::on_sender_error(proton::sender &s) { + std::cerr << "AmqpSender::on_sender_error(): " << s.error() << std::endl; + } + + void Sender::on_session_error(proton::session &s) { + std::cerr << "AmqpSender::on_session_error(): " << s.error() << std::endl; + } + + void Sender::on_transport_error(proton::transport &t) { + std::cerr << "AmqpSender::on_transport_error(): " << t.error() << std::endl; + } + + void Sender::on_error(const proton::error_condition &ec) { + std::cerr << "AmqpSender::on_error(): " << ec << std::endl; + } + + // protected + + proton::message& Sender::setMessage(proton::message& msg, const Json::Value& testValue) { + msg.id(_msgsSent + 1); + if (_amqpType.compare("null") == 0) { + std::string testValueStr(testValue.asString()); + if (testValueStr.compare("None") != 0) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + proton::value v; + msg.body(v); + } else if (_amqpType.compare("boolean") == 0) { + std::string testValueStr(testValue.asString()); + if (testValueStr.compare("True") == 0) { + msg.body(true); + } else if (testValueStr.compare("False") == 0) { + msg.body(false); + } else { + throw qpidit::InvalidTestValueError(_amqpType, testValueStr); + } + } else if (_amqpType.compare("ubyte") == 0) { + setIntegralValue<uint8_t>(msg, testValue.asString(), true); + } else if (_amqpType.compare("ushort") == 0) { + setIntegralValue<uint16_t>(msg, testValue.asString(), true); + } else if (_amqpType.compare("uint") == 0) { + setIntegralValue<uint32_t>(msg, testValue.asString(), true); + } else if (_amqpType.compare("ulong") == 0) { + setIntegralValue<uint64_t>(msg, testValue.asString(), true); + } else if (_amqpType.compare("byte") == 0) { + setIntegralValue<int8_t>(msg, testValue.asString(), false); + } else if (_amqpType.compare("short") == 0) { + setIntegralValue<int16_t>(msg, testValue.asString(), false); + } else if (_amqpType.compare("int") == 0) { + setIntegralValue<int32_t>(msg, testValue.asString(), false); + } else if (_amqpType.compare("long") == 0) { + setIntegralValue<int64_t>(msg, testValue.asString(), false); + } else if (_amqpType.compare("float") == 0) { + setFloatValue<float, uint32_t>(msg, testValue.asString()); + } else if (_amqpType.compare("double") == 0) { + setFloatValue<double, uint64_t>(msg, testValue.asString()); + } else if (_amqpType.compare("decimal32") == 0) { + proton::decimal32 val; + hexStringToBytearray(val, testValue.asString().substr(2)); + msg.body(val); + } else if (_amqpType.compare("decimal64") == 0) { + proton::decimal64 val; + hexStringToBytearray(val, testValue.asString().substr(2)); + msg.body(val); + } else if (_amqpType.compare("decimal128") == 0) { + proton::decimal128 val; + hexStringToBytearray(val, testValue.asString().substr(2)); + msg.body(val); + } else if (_amqpType.compare("char") == 0) { + std::string charStr = testValue.asString(); + wchar_t val; + if (charStr.size() == 1) { // Single char "a" + val = charStr[0]; + } else if (charStr.size() >= 3 && charStr.size() <= 10) { // Format "0xN" through "0xNNNNNNNN" + val = std::strtoul(charStr.data(), NULL, 16); + } else { + //TODO throw format error + } + msg.body(val); + } else if (_amqpType.compare("timestamp") == 0) { + proton::timestamp val(std::strtoul(testValue.asString().data(), NULL, 16)); + msg.body(val); + } else if (_amqpType.compare("uuid") == 0) { + proton::uuid val; + std::string uuidStr(testValue.asString()); + // Expected format: "00000000-0000-0000-0000-000000000000" + // ^ ^ ^ ^ ^ + // start index -> 0 9 14 19 24 + hexStringToBytearray(val, uuidStr.substr(0, 8), 0, 4); + hexStringToBytearray(val, uuidStr.substr(9, 4), 4, 2); + hexStringToBytearray(val, uuidStr.substr(14, 4), 6, 2); + hexStringToBytearray(val, uuidStr.substr(19, 4), 8, 2); + hexStringToBytearray(val, uuidStr.substr(24, 12), 10, 6); + msg.body(val); + } else if (_amqpType.compare("binary") == 0) { + //setStringValue<proton::amqp_binary>(msg, testValue.asString()); + proton::binary val(testValue.asString()); + msg.body(val); + } else if (_amqpType.compare("string") == 0) { + //setStringValue<proton::amqp_string>(msg, testValue.asString()); + std::string val(testValue.asString()); + msg.body(val); + } else if (_amqpType.compare("symbol") == 0) { + //setStringValue<proton::amqp_symbol>(msg, testValue.asString()); + proton::symbol val(testValue.asString()); + msg.body(val); + } else if (_amqpType.compare("list") == 0) { + std::vector<proton::value> list; + processList(list, testValue); + msg.body(list); + } else if (_amqpType.compare("map") == 0) { + std::map<std::string, proton::value> map; + processMap(map, testValue); + msg.body(map); + } else if (_amqpType.compare("array") == 0) { +/* + std::vector<proton::value> array; + processArray(array, testValue); + msg.body(proton::as<proton::ARRAY>(array)); +*/ + throw qpidit::UnsupportedAmqpTypeError(_amqpType); + } else { + throw qpidit::UnknownAmqpTypeError(_amqpType); + } + return msg; + } + + //static + std::string Sender::bytearrayToHexStr(const char* src, int len) { + std::ostringstream oss; + oss << "0x" << std::hex; + for (int i=0; i<len; ++i) { + oss << std::setw(2) << std::setfill('0') << ((int)src[i] & 0xff); + } + return oss.str(); + } + + //static + proton::value Sender::extractProtonValue(const Json::Value& val) { + switch (val.type()) { + case Json::nullValue: + { + proton::value v; //proton::null n; + return v; //proton::value(n); + } + case Json::intValue: + return proton::value(val.asInt()); + case Json::uintValue: + return proton::value(val.asUInt()); + case Json::realValue: + return proton::value(val.asDouble()); + case Json::stringValue: + return proton::value(val.asString()); + case Json::booleanValue: + return proton::value(val.asBool()); + default:; + } + } + +// //static +// Json::Value::ValueType getArrayType(const Json::Value& val) { +// if (val.size()) > 0) { +// return val[0].type(); +// } else { +// return Json::Value::nullValue; // TODO: find a way to represent empty array +// } +// } + + //static + void Sender::processArray(std::vector<proton::value>& array, const Json::Value& testValues) { + for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) { + if ((*i).isArray()) { + std::vector<proton::value> subArray; + processArray(subArray, *i); + array.push_back(proton::value(subArray)); + } else if ((*i).isObject()) { + std::map<std::string, proton::value> subMap; + processMap(subMap, *i); + array.push_back(proton::value(subMap)); + } else { + proton::value v; + if ((*i).isNull()) + ; + else if ((*i).isBool()) + v = (*i).asBool(); + else if ((*i).isInt()) + v = (*i).asInt(); + else if ((*i).isUInt()) + v = (*i).asUInt(); + else if ((*i).isDouble()) + v = (*i).asDouble(); + else if ((*i).isString()) + v = (*i).asString(); + else + ; // TODO handle this case + array.push_back(v); + } + } + } + + //static + void Sender::processList(std::vector<proton::value>& list, const Json::Value& testValues) { + for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) { + if ((*i).isArray()) { + std::vector<proton::value> subList; + processList(subList, *i); + list.push_back(proton::value(subList)); + } else if ((*i).isObject()) { + std::map<std::string, proton::value> subMap; + processMap(subMap, *i); + list.push_back(proton::value(subMap)); + } else { + list.push_back(extractProtonValue(*i)); + } + } + //std::cout << std::endl; + } + + //static + void Sender::processMap(std::map<std::string, proton::value>& map, const Json::Value& testValues) { + Json::Value::Members keys = testValues.getMemberNames(); + for (std::vector<std::string>::const_iterator i=keys.begin(); i!=keys.end(); ++i) { + Json::Value mapVal = testValues[*i]; + if (mapVal.isArray()) { + std::vector<proton::value> subList; + processList(subList, mapVal); + map[*i] = subList; + } else if (mapVal.isObject()) { + std::map<std::string, proton::value> subMap; + processMap(subMap, mapVal); + map[*i] = subMap; + } else { + map[*i] = extractProtonValue(mapVal); + } + } + } + + //static + void Sender::revMemcpy(char* dest, const char* src, int n) { + for (int i = 0; i < n; ++i) { + *(dest + i) = *(src + n - i - 1); + } + } + + //static + void Sender::uint64ToChar16(char* dest, uint64_t upper, uint64_t lower) { + revMemcpy(dest, (const char*)&upper, sizeof(uint64_t)); + revMemcpy(dest + 8, (const char*)&lower, sizeof(uint64_t)); + } + + } /* namespace amqp_types_test */ +} /* namespace qpidit */ + + +/* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: AMQP type + * 4: Test value(s) as JSON string + */ + +int main(int argc, char** argv) { + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + Json::Value testValues; + Json::Reader jsonReader; + if (not jsonReader.parse(argv[4], testValues, false)) { + throw qpidit::JsonParserError(jsonReader); + } + + qpidit::amqp_types_test::Sender sender(oss.str(), argv[3], testValues); + proton::default_container(sender).run(); + } catch (const std::exception& e) { + std::cerr << "AmqpSender error: " << e.what() << std::endl; + exit(1); + } + exit(0); +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.hpp b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.hpp new file mode 100644 index 0000000..da53a2e --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/amqp_types_test/Sender.hpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_AMQP_TYPES_TEST_SENDER_HPP_ +#define SRC_QPIDIT_AMQP_TYPES_TEST_SENDER_HPP_ + +#include <iomanip> +#include <json/value.h> +#include "proton/messaging_handler.hpp" +#include "proton/message.hpp" +#include "qpidit/QpidItErrors.hpp" + +namespace qpidit +{ + namespace amqp_types_test + { + + class Sender : public proton::messaging_handler + { + protected: + const std::string _brokerUrl; + const std::string _amqpType; + const Json::Value _testValues; + uint32_t _msgsSent; + uint32_t _msgsConfirmed; + uint32_t _totalMsgs; + public: + Sender(const std::string& brokerUrl, const std::string& amqpType, const Json::Value& testValues); + virtual ~Sender(); + void on_container_start(proton::container &c); + void on_sendable(proton::sender &s); + void on_tracker_accept(proton::tracker &t); + void on_transport_close(proton::transport &t); + + void on_connection_error(proton::connection &c); + void on_session_error(proton::session &s); + void on_sender_error(proton::sender& s); + void on_transport_error(proton::transport &t); + void on_error(const proton::error_condition &c); + protected: + proton::message& setMessage(proton::message& msg, const Json::Value& testValue); + + static std::string bytearrayToHexStr(const char* src, int len); + static void revMemcpy(char* dest, const char* src, int n); + static void uint64ToChar16(char* dest, uint64_t upper, uint64_t lower); + + static proton::value extractProtonValue(const Json::Value& val); + //static Json::Value::ValueType getArrayType(const Json::Value& val); + static void processArray(std::vector<proton::value>& array, const Json::Value& testValues); + static void processList(std::vector<proton::value>& list, const Json::Value& testValues); + static void processMap(std::map<std::string, proton::value>& map, const Json::Value& testValues); + + template<size_t N> static void hexStringToBytearray(proton::byte_array<N>& ba, const std::string s, size_t fromArrayIndex = 0, size_t arrayLen = N) { + for (size_t i=0; i<arrayLen; ++i) { + ba[fromArrayIndex + i] = (char)std::strtoul(s.substr(2*i, 2).c_str(), NULL, 16); + } + } + + // Set message body to floating type T through integral type U + // Used to convert a hex string representation of a float or double to a float or double + template<typename T, typename U> void setFloatValue(proton::message& msg, const std::string& testValueStr) { + try { + U ival(std::strtoul(testValueStr.data(), NULL, 16)); + msg.body(T(*reinterpret_cast<T*>(&ival))); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + } + + template<typename T> void setIntegralValue(proton::message& msg, const std::string& testValueStr, bool unsignedVal) { + try { + T val(unsignedVal ? std::strtoul(testValueStr.data(), NULL, 16) : std::strtol(testValueStr.data(), NULL, 16)); + msg.body(val); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + } + + template<typename T> void setStringValue(proton::message& msg, const std::string& testValueStr) { + try { + T val(testValueStr); + msg.body(val); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + } + }; + + } /* namespace amqp_types_test */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_AMQP_TYPES_TEST_SENDER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp new file mode 100644 index 0000000..6f99813 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp @@ -0,0 +1,46 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_JMS_MESSAGES_TEST_JMSDEFINTIONS_HPP_ +#define SRC_QPIDIT_JMS_MESSAGES_TEST_JMSDEFINTIONS_HPP_ + +namespace qpidit +{ + namespace jms_messages_test + { + typedef enum {JMS_QUEUE = 0, + JMS_TOPIC, + JMS_TMEP_QUEUE, + JMS_TEMP_TOPIC} + jmsDestinationType_t; + + typedef enum {JMS_MESSAGE_TYPE=0, + JMS_OBJECTMESSAGE_TYPE, + JMS_MAPMESSAGE_TYPE, + JMS_BYTESMESSAGE_TYPE, + JMS_STREAMMESSAGE_TYPE, + JMS_TEXTMESSAGE_TYPE} + jmsMessageType_t; + + } +} + +#endif /* SRC_QPIDIT_JMS_MESSAGES_TEST_JMSDEFINTIONS_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp new file mode 100644 index 0000000..ed1dfa5 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp @@ -0,0 +1,432 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/jms_messages_test/Receiver.hpp" + +#include <iostream> +#include <json/json.h> +#include "proton/connection.hpp" +#include "proton/default_container.hpp" +#include "proton/delivery.hpp" +#include "proton/transport.hpp" +#include "qpidit/QpidItErrors.hpp" + +namespace qpidit +{ + namespace jms_messages_test + { + //static + proton::symbol Receiver::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type"); + std::map<std::string, int8_t>Receiver::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap(); + + + Receiver::Receiver(const std::string& brokerUrl, + const std::string& jmsMessageType, + const Json::Value& testNumberMap, + const Json::Value& flagMap): + _brokerUrl(brokerUrl), + _jmsMessageType(jmsMessageType), + _testNumberMap(testNumberMap), + _flagMap(flagMap), + _subTypeList(testNumberMap.getMemberNames()), + _subTypeIndex(0), + _expected(getTotalNumExpectedMsgs(testNumberMap)), + _received(0UL), + _receivedSubTypeList(Json::arrayValue), + _receivedValueMap(Json::objectValue), + _receivedHeadersMap(Json::objectValue), + _receivedPropertiesMap(Json::objectValue) + {} + + Receiver::~Receiver() {} + + Json::Value& Receiver::getReceivedValueMap() { + return _receivedValueMap; + } + + Json::Value& Receiver::getReceivedHeadersMap() { + return _receivedHeadersMap; + } + + Json::Value& Receiver::getReceivedPropertiesMap() { + return _receivedPropertiesMap; + } + + void Receiver::on_container_start(proton::container &c) { + c.open_receiver(_brokerUrl); + } + + void Receiver::on_message(proton::delivery &d, proton::message &m) { + try { + if (_received < _expected) { + int8_t t = JMS_MESSAGE_TYPE; + try {t = m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")).get<int8_t>();} + catch (const std::exception& e) { + std::cout << "JmsReceiver::on_message(): Missing annotation \"x-opt-jms-msg-type\"" << std::endl; + throw; + } + switch (t) { + case JMS_MESSAGE_TYPE: + receiveJmsMessage(m); + break; + case JMS_OBJECTMESSAGE_TYPE: + receiveJmsObjectMessage(m); + break; + case JMS_MAPMESSAGE_TYPE: + receiveJmsMapMessage(m); + break; + case JMS_BYTESMESSAGE_TYPE: + receiveJmsBytesMessage(m); + break; + case JMS_STREAMMESSAGE_TYPE: + receiveJmsStreamMessage(m); + break; + case JMS_TEXTMESSAGE_TYPE: + receiveJmsTextMessage(m); + break; + default:; + // TODO: handle error - no known JMS message type + } + + processMessageHeaders(m); + processMessageProperties(m); + + std::string subType(_subTypeList[_subTypeIndex]); + // Increment the subtype if the required number of messages have been received + if (_receivedSubTypeList.size() >= _testNumberMap[subType].asInt() && + _subTypeIndex < _testNumberMap.size()) { + _receivedValueMap[subType] = _receivedSubTypeList; + _receivedSubTypeList.clear(); + ++_subTypeIndex; + } + _received++; + if (_received >= _expected) { + d.receiver().close(); + d.connection().close(); + } + } + } catch (const std::exception&) { + d.receiver().close(); + d.connection().close(); + throw; + } + } + + void Receiver::on_connection_error(proton::connection &c) { + std::cerr << "JmsReceiver::on_connection_error(): " << c.error() << std::endl; + } + + void Receiver::on_receiver_error(proton::receiver& r) { + std::cerr << "JmsReceiver::on_receiver_error(): " << r.error() << std::endl; + } + + void Receiver::on_session_error(proton::session &s) { + std::cerr << "JmsReceiver::on_session_error(): " << s.error() << std::endl; + } + + void Receiver::on_transport_error(proton::transport &t) { + std::cerr << "JmsReceiver::on_transport_error(): " << t.error() << std::endl; + } + + void Receiver::on_error(const proton::error_condition &ec) { + std::cerr << "JmsReceiver::on_error(): " << ec << std::endl; + } + + //static + uint32_t Receiver::getTotalNumExpectedMsgs(const Json::Value testNumberMap) { + uint32_t total(0UL); + for (Json::Value::const_iterator i=testNumberMap.begin(); i!=testNumberMap.end(); ++i) { + total += (*i).asUInt(); + } + return total; + + } + + // protected + + void Receiver::receiveJmsMessage(const proton::message& msg) { + _receivedSubTypeList.append(Json::Value()); + } + + void Receiver::receiveJmsObjectMessage(const proton::message& msg) { + // TODO + } + + void Receiver::receiveJmsMapMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_MAPMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + std::map<std::string, proton::value> m; + msg.body().get(m); + for (std::map<std::string, proton::value>::const_iterator i=m.begin(); i!=m.end(); ++i) { + std::string key = i->first; + if (subType.compare(key.substr(0, key.size()-3)) != 0) { + throw qpidit::IncorrectJmsMapKeyPrefixError(subType, key); + } + proton::value val = i->second; + if (subType.compare("boolean") == 0) { + _receivedSubTypeList.append(val.get<bool>() ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val.get<int8_t>()))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(std::string(val.get<proton::binary>()))); + } else if (subType.compare("char") == 0) { + std::ostringstream oss; + oss << (char)val.get<wchar_t>(); + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + double d = val.get<double>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); + } else if (subType.compare("float") == 0) { + float f = val.get<float>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); + } else if (subType.compare("int") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val.get<int32_t>()))); + } else if (subType.compare("long") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val.get<int64_t>()))); + } else if (subType.compare("short") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val.get<int16_t>()))); + } else if (subType.compare("string") == 0) { + _receivedSubTypeList.append(Json::Value(val.get<std::string>())); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + } + + void Receiver::receiveJmsBytesMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_BYTESMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + proton::binary body = msg.body().get<proton::binary>(); + if (subType.compare("boolean") == 0) { + if (body.size() != 1) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=boolean", 1, body.size()); + _receivedSubTypeList.append(body[0] ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + if (body.size() != sizeof(int8_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=byte", sizeof(int8_t), body.size()); + int8_t val = *((int8_t*)body.data()); + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(std::string(body))); + } else if (subType.compare("char") == 0) { + if (body.size() != sizeof(uint16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=char", sizeof(uint16_t), body.size()); + // TODO: This is ugly: ignoring first byte - handle UTF-16 correctly + char c = body[1]; + std::ostringstream oss; + oss << c; + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=double", sizeof(int64_t), body.size()); + int64_t val = be64toh(*((int64_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val, true, false))); + } else if (subType.compare("float") == 0) { + if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=float", sizeof(int32_t), body.size()); + int32_t val = be32toh(*((int32_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val, true, false))); + } else if (subType.compare("long") == 0) { + if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=long", sizeof(int64_t), body.size()); + int64_t val = be64toh(*((int64_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val))); + } else if (subType.compare("int") == 0) { + if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=int", sizeof(int32_t), body.size()); + int32_t val = be32toh(*((int32_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val))); + } else if (subType.compare("short") == 0) { + if (body.size() != sizeof(int16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=short", sizeof(int16_t), body.size()); + int16_t val = be16toh(*((int16_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val))); + } else if (subType.compare("string") == 0) { + // TODO: decode string size in first two bytes and check string size + _receivedSubTypeList.append(Json::Value(std::string(body).substr(2))); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + + void Receiver::receiveJmsStreamMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_STREAMMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + std::vector<proton::value> l; + msg.body().get(l); + for (std::vector<proton::value>::const_iterator i=l.begin(); i!=l.end(); ++i) { + if (subType.compare("boolean") == 0) { + _receivedSubTypeList.append(i->get<bool>() ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(i->get<int8_t>()))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(std::string(i->get<proton::binary>()))); + } else if (subType.compare("char") == 0) { + std::ostringstream oss; + oss << (char)i->get<wchar_t>(); + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + double d = i->get<double>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); + } else if (subType.compare("float") == 0) { + float f = i->get<float>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); + } else if (subType.compare("int") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(i->get<int32_t>()))); + } else if (subType.compare("long") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(i->get<int64_t>()))); + } else if (subType.compare("short") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(i->get<int16_t>()))); + } else if (subType.compare("string") == 0) { + _receivedSubTypeList.append(Json::Value(i->get<std::string>())); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + + } + + void Receiver::receiveJmsTextMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_TEXTMESSAGE_TYPE"); + } + _receivedSubTypeList.append(Json::Value(msg.body().get<std::string>())); + } + + void Receiver::processMessageHeaders(const proton::message& msg) { + addMessageHeaderString("JMS_TYPE_HEADER", msg.subject()); + if (_flagMap.isMember("JMS_CORRELATIONID_AS_BYTES") && _flagMap["JMS_CORRELATIONID_AS_BYTES"].asBool()) { + addMessageHeaderByteArray("JMS_CORRELATIONID_HEADER", proton::get<proton::binary>(msg.correlation_id())); + } else { + try { + addMessageHeaderString("JMS_CORRELATIONID_HEADER", proton::get<std::string>(msg.correlation_id())); + } catch (const std::exception& e) {} // TODO: UGLY, how do you check if there _is_ a correlation id? + } + + std::string reply_to = msg.reply_to(); + // Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present + if (_flagMap.isMember("JMS_REPLYTO_AS_TOPIC") && _flagMap["JMS_REPLYTO_AS_TOPIC"].asBool()) { + if (reply_to.find("topic://") == 0) { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to.substr(8)); + } else { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to); + } + } else { + if (reply_to.find("queue://") == 0) { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to.substr(8)); + } else { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to); + } + } + } + + void Receiver::addMessageHeaderString(const char* headerName, const std::string& value) { + if (!value.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty strings are allowed in headers + Json::Value valueMap(Json::objectValue); + valueMap["string"] = value; + _receivedHeadersMap[headerName] = valueMap; + } + } + + void Receiver::addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba) { + if (!ba.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty binaries are allowed in headers + Json::Value valueMap(Json::objectValue); + valueMap["bytes"] = std::string(ba); + _receivedHeadersMap[headerName] = valueMap; + } + } + + void Receiver::addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d) { + if (!d.empty()) { + Json::Value valueMap(Json::objectValue); + switch (dt) { + case JMS_QUEUE: + valueMap["queue"] = d; + break; + case JMS_TOPIC: + valueMap["topic"] = d; + break; + default: + ; // TODO: Handle error: remaining JMS destinations not handled. + } + _receivedHeadersMap[headerName] = valueMap; + } + } + + void Receiver::processMessageProperties(const proton::message& msg) { + // TODO: Add this function when PROTON-1284 is fixed +// std::map<proton::value, proton::value> props; +// msg.properties().value() >> props; + } + + //static + std::map<std::string, int8_t> Receiver::initializeJmsMessageTypeAnnotationMap() { + std::map<std::string, int8_t> m; + m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE; + m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE; + m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE; + m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE; + m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE; + m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE; + return m; + } + + + } /* namespace jms_messages_test */ +} /* namespace qpidit */ + +/* --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: JMS message type + * 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap] + */ +int main(int argc, char** argv) { + /* + for (int i=0; i<argc; ++i) { + std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl; + } + */ + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + Json::Value testParams; + Json::Reader jsonReader; + if (not jsonReader.parse(argv[4], testParams, false)) { + throw qpidit::JsonParserError(jsonReader); + } + + qpidit::jms_messages_test::Receiver receiver(oss.str(), argv[3], testParams[0], testParams[1]); + proton::default_container(receiver).run(); + + Json::FastWriter fw; + std::cout << argv[3] << std::endl; + std::cout << fw.write(receiver.getReceivedValueMap()); + std::cout << fw.write(receiver.getReceivedHeadersMap()); + std::cout << fw.write(receiver.getReceivedPropertiesMap()); + } catch (const std::exception& e) { + std::cout << "JmsReceiver error: " << e.what() << std::endl; + } +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp new file mode 100644 index 0000000..696b9dd --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_JMS_MESSAGES_TEST_RECEIVER_HPP_ +#define SRC_QPIDIT_JMS_MESSAGES_TEST_RECEIVER_HPP_ + +#include <iomanip> +#include <json/value.h> +#include "proton/messaging_handler.hpp" +#include "proton/types.hpp" +#include "qpidit/jms_messages_test/JmsDefinitions.hpp" +#include <sstream> + +namespace qpidit +{ + namespace jms_messages_test + { + + class Receiver : public proton::messaging_handler + { + protected: + static proton::symbol s_jmsMessageTypeAnnotationKey; + static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues; + + const std::string _brokerUrl; + const std::string _jmsMessageType; + const Json::Value _testNumberMap; + const Json::Value _flagMap; + Json::Value::Members _subTypeList; + int _subTypeIndex; + uint32_t _expected; + uint32_t _received; + Json::Value _receivedSubTypeList; + Json::Value _receivedValueMap; + Json::Value _receivedHeadersMap; + Json::Value _receivedPropertiesMap; + public: + Receiver(const std::string& brokerUrl, + const std::string& jmsMessageType, + const Json::Value& testNumberMap, + const Json::Value& flagMap); + virtual ~Receiver(); + Json::Value& getReceivedValueMap(); + Json::Value& getReceivedHeadersMap(); + Json::Value& getReceivedPropertiesMap(); + void on_container_start(proton::container &c); + void on_message(proton::delivery &d, proton::message &m); + + void on_connection_error(proton::connection &c); + void on_receiver_error(proton::receiver& r); + void on_session_error(proton::session &s); + void on_transport_error(proton::transport &t); + void on_error(const proton::error_condition &c); + + static uint32_t getTotalNumExpectedMsgs(const Json::Value testNumberMap); + + protected: + void receiveJmsMessage(const proton::message& msg); + void receiveJmsObjectMessage(const proton::message& msg); + void receiveJmsMapMessage(const proton::message& msg); + void receiveJmsBytesMessage(const proton::message& msg); + void receiveJmsStreamMessage(const proton::message& msg); + void receiveJmsTextMessage(const proton::message& msg); + + void processMessageHeaders(const proton::message& msg); + void addMessageHeaderString(const char* headerName, const std::string& value); + void addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba); + void addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d); + void processMessageProperties(const proton::message& msg); + + static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap(); + + // Format signed numbers in negative hex format if signedFlag is true, ie -0xNNNN, positive numbers in 0xNNNN format + template<typename T> static std::string toHexStr(T val, bool fillFlag = false, bool signedFlag = true) { + std::ostringstream oss; + bool neg = false; + if (signedFlag) { + neg = val < 0; + if (neg) val = -val; + } + oss << (neg ? "-" : "") << "0x" << std::hex; + if (fillFlag) { + oss << std::setw(sizeof(T)*2) << std::setfill('0'); + } + oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val); + return oss.str(); + } + }; + + } /* namespace jms_messages_test */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_JMS_MESSAGES_TEST_RECEIVER_HPP_ */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
