QPIDIT-21: Add shims for C++ client
Project: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/commit/5f7ac58b Tree: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/tree/5f7ac58b Diff: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/diff/5f7ac58b Branch: refs/heads/master Commit: 5f7ac58b029c7fe2294b0bbd1097bf2750b91653 Parents: 6c8ab3b Author: Kim van der Riet <[email protected]> Authored: Tue Dec 15 15:58:44 2015 -0500 Committer: Kim van der Riet <[email protected]> Committed: Tue Dec 15 15:58:44 2015 -0500 ---------------------------------------------------------------------- QUICKSTART | 2 +- shims/qpid-proton-cpp/CMakeLists.txt | 24 ++ shims/qpid-proton-cpp/src/CMakeLists.txt | 78 +++++ .../qpid-proton-cpp/src/qpidit/QpidItErrors.cpp | 197 +++++++++++ .../qpid-proton-cpp/src/qpidit/QpidItErrors.hpp | 165 +++++++++ .../src/qpidit/shim/AmqpReceiver.cpp | 279 +++++++++++++++ .../src/qpidit/shim/AmqpReceiver.hpp | 75 +++++ .../src/qpidit/shim/AmqpSender.cpp | 329 ++++++++++++++++++ .../src/qpidit/shim/AmqpSender.hpp | 90 +++++ .../src/qpidit/shim/JmsReceiver.cpp | 311 +++++++++++++++++ .../src/qpidit/shim/JmsReceiver.hpp | 88 +++++ .../src/qpidit/shim/JmsSender.cpp | 336 +++++++++++++++++++ .../src/qpidit/shim/JmsSender.hpp | 85 +++++ shims/qpid-proton-python/src/JmsSenderShim.py | 15 +- .../qpid-proton-python/src/TypesReceiverShim.py | 13 +- shims/qpid-proton-python/src/TypesSenderShim.py | 11 +- .../qpid-interop-test/jms/jms_message_tests.py | 110 +++--- .../types/simple_type_tests.py | 109 +++--- 18 files changed, 2217 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/QUICKSTART ---------------------------------------------------------------------- diff --git a/QUICKSTART b/QUICKSTART index d94fc2a..f13d101 100644 --- a/QUICKSTART +++ b/QUICKSTART @@ -45,7 +45,7 @@ Note that installation is still to be completed, this section will change to reflect installation details when complete. Assuming proton's make install has been run, from top level qpid-interop-test directory: -export PYTHONPATH=/usr/lib64/python2.7/site-packages:src/py/qpid-interop-test +export PYTHONPATH=/usr/local/lib64/proton/bindings/python:src/py/qpid-interop-test export LD_LIBRARY_PATH=/usr/local/lib64 export QPID_INTEROP_TEST_HOME=<abs path to top level qpid-interop-test directory> http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/CMakeLists.txt b/shims/qpid-proton-cpp/CMakeLists.txt new file mode 100644 index 0000000..d4d3cf3 --- /dev/null +++ b/shims/qpid-proton-cpp/CMakeLists.txt @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +project (qpid-interop-test-cpp-shims) + +add_subdirectory(src) + +cmake_minimum_required(VERSION 2.6) http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/CMakeLists.txt b/shims/qpid-proton-cpp/src/CMakeLists.txt new file mode 100644 index 0000000..cece52c --- /dev/null +++ b/shims/qpid-proton-cpp/src/CMakeLists.txt @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +include_directories(/usr/include/jsoncpp) +include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + +# --- Common files and libs --- + +set(Common_SOURCES + qpidit/QpidItErrors.hpp + qpidit/QpidItErrors.cpp +) + +set(Common_LIBS + qpid-proton-cpp + jsoncpp +) + +# --- AmqpSender --- + +set(AmqpSender_SOURCES + ${Common_SOURCES} + qpidit/shim/AmqpSender.hpp + qpidit/shim/AmqpSender.cpp +) + +add_executable(AmqpSender ${AmqpSender_SOURCES}) +target_link_libraries(AmqpSender ${Common_LIBS}) + +# --- AmqpReceiver --- + +set(AmqpReceiver_SOURCES + ${Common_SOURCES} + qpidit/shim/AmqpReceiver.hpp + qpidit/shim/AmqpReceiver.cpp +) + +add_executable(AmqpReceiver ${AmqpReceiver_SOURCES}) +target_link_libraries(AmqpReceiver ${Common_LIBS}) + +# --- JmsSender --- + +set(JmsSender_SOURCES + ${Common_SOURCES} + qpidit/shim/JmsSender.hpp + qpidit/shim/JmsSender.cpp +) + +add_executable(JmsSender ${JmsSender_SOURCES}) +target_link_libraries(JmsSender ${Common_LIBS}) + +# --- JmsReceiver --- + +set(JmsReceiver_SOURCES + ${Common_SOURCES} + qpidit/shim/JmsReceiver.hpp + qpidit/shim/JmsReceiver.cpp +) + +add_executable(JmsReceiver ${JmsReceiver_SOURCES}) +target_link_libraries(JmsReceiver ${Common_LIBS}) http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.cpp b/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.cpp new file mode 100644 index 0000000..dadcbde --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.cpp @@ -0,0 +1,197 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/QpidItErrors.hpp" + +#include <json/reader.h> + +namespace qpidit +{ + + // --- ErrorMessage --- + + Message::Message() : oss() {} + + Message::Message(const Message& e) : oss(e.toString()) {} + + std::string Message::toString() const { return oss.str(); } + + Message::operator std::string() const { return toString(); } + + std::ostream& operator<<(std::ostream& out, const Message& m) { return out << m.toString(); } + + + // --- ArgumentError --- + + ArgumentError::ArgumentError(const std::string& msg) : std::runtime_error(msg) {} + + ArgumentError::~ArgumentError() throw() {} + + // --- ErrnoError --- + + ErrnoError::ErrnoError(const std::string& funcName, int errorNum) : + std::runtime_error(MSG(funcName << "() returned " << errorNum << " (" << strerror(errorNum) << ")")) + {} + + ErrnoError::~ErrnoError() {} + + // --- IncorrectJmsMapKeyPrefixError --- + + IncorrectJmsMapKeyPrefixError::IncorrectJmsMapKeyPrefixError(const std::string& expected, const std::string& key) : + std::runtime_error(MSG("Incorrect JMS map key: expected \"" << expected << "\", found \"" + << key.substr(0, key.size()-3) << "\"")) + {} + + IncorrectJmsMapKeyPrefixError::~IncorrectJmsMapKeyPrefixError() {} + + // --- IncorrectMessageBodyLengthError --- + + IncorrectMessageBodyLengthError::IncorrectMessageBodyLengthError(int expected, int found) : + std::runtime_error(MSG("Incorrect body length found in message body: expected: " << expected + << "; found " << found)) + {} + + IncorrectMessageBodyLengthError::~IncorrectMessageBodyLengthError() {} + + // --- IncorrectMessageBodyTypeError --- + + IncorrectMessageBodyTypeError::IncorrectMessageBodyTypeError(proton::type_id expected, proton::type_id found) : + std::runtime_error(MSG("Incorrect AMQP type found in message body: expected: " << expected + << "; found: " << found)) + {} + + IncorrectMessageBodyTypeError::IncorrectMessageBodyTypeError(const std::string& expected, const std::string& found) : + std::runtime_error(MSG("Incorrect JMS message type found: expected: " << expected + << "; found: " << found)) + {} + + IncorrectMessageBodyTypeError::~IncorrectMessageBodyTypeError() {} + + + // --- IncorrectValueTypeError --- + // TODO: Consolidate with IncorrectMessageBodyTypeError? + + IncorrectValueTypeError::IncorrectValueTypeError(const proton::value& val) : + std::runtime_error(MSG("Incorrect value type received: " << val.type())) + {} + + IncorrectValueTypeError::~IncorrectValueTypeError() {} + + + // --- InvalidJsonRootNodeError --- + + //static + std::map<Json::ValueType, std::string> InvalidJsonRootNodeError::s_JsonValueTypeNames = { + {Json::nullValue, "Json::nullValue"}, + {Json::intValue, "Json::intValue"}, + {Json::uintValue, "Json::uintValue"}, + {Json::realValue, "Json::realValue"}, + {Json::stringValue, "Json::stringValue"}, + {Json::booleanValue, "Json::booleanValue"}, + {Json::arrayValue, "Json::arrayValue"}, + {Json::objectValue, "Json::objectValue"}, + }; + + InvalidJsonRootNodeError::InvalidJsonRootNodeError(const Json::ValueType& expected, const Json::ValueType& actual) : + std::runtime_error(MSG("Invalid JSON root node: Expected type " << formatJsonValueType(expected) + << ", received type " << formatJsonValueType(actual))) + {} + + InvalidJsonRootNodeError::~InvalidJsonRootNodeError() {} + + // protected + + //static + std::string InvalidJsonRootNodeError::formatJsonValueType(const Json::ValueType& valueType) { + std::ostringstream oss; + oss << valueType << " (" << s_JsonValueTypeNames[valueType] << ")"; + return oss.str(); + } + + // --- InvalidTestValueError --- + + InvalidTestValueError::InvalidTestValueError(const std::string& type, const std::string& valueStr) : + std::runtime_error(MSG("Invalid test value: \"" << valueStr << "\" is not valid for type " << type)) + {} + + InvalidTestValueError::~InvalidTestValueError() throw() {} + + + // --- JsonParserError --- + + JsonParserError::JsonParserError(const Json::Reader& jsonReader) : + std::runtime_error(MSG("JSON test values failed to parse: " << jsonReader.getFormattedErrorMessages())) + {} + + JsonParserError::~JsonParserError() throw() {} + + + // --- PcloseError --- + + PcloseError::PcloseError(int errorNum) : ErrnoError("pclose", errorNum) {} + + PcloseError::~PcloseError() {} + + + // --- PopenError --- + + PopenError::PopenError(int errorNum) : ErrnoError("popen", errorNum) {} + + PopenError::~PopenError() {} + + + // --- UnknownAmqpTypeError --- + + UnknownAmqpTypeError::UnknownAmqpTypeError(const std::string& amqpType) : + std::runtime_error(MSG("Unknown AMQP type \"" << amqpType << "\"")) + {} + + UnknownAmqpTypeError::~UnknownAmqpTypeError() throw() {} + + + // --- UnknownJmsMessageSubTypeError --- + + UnknownJmsMessageSubTypeError::UnknownJmsMessageSubTypeError(const std::string& jmsMessageSubType) : + std::runtime_error(MSG("Unknown JMS sub-type \"" << jmsMessageSubType << "\"")) + {} + + UnknownJmsMessageSubTypeError::~UnknownJmsMessageSubTypeError() {} + + + // --- UnknownJmsMessageTypeError --- + + UnknownJmsMessageTypeError::UnknownJmsMessageTypeError(const std::string& jmsMessageType) : + std::runtime_error(MSG("Unknown JMS message type \"" << jmsMessageType << "\"")) + {} + + UnknownJmsMessageTypeError::~UnknownJmsMessageTypeError() {} + + + // --- UnsupportedAmqpTypeError --- + + UnsupportedAmqpTypeError::UnsupportedAmqpTypeError(const std::string& amqpType) : + std::runtime_error(MSG("Unsupported AMQP type \"" << amqpType << "\"")) + {} + + UnsupportedAmqpTypeError::~UnsupportedAmqpTypeError() throw() {} + + +} /* namespace qpidit */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.hpp b/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.hpp new file mode 100644 index 0000000..23833a6 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/QpidItErrors.hpp @@ -0,0 +1,165 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_QPIDITERRORS_HPP_ +#define SRC_QPIDIT_QPIDITERRORS_HPP_ + +#include <json/value.h> +#include <sstream> +#include <map> +#include <proton/types.hpp> +#include <proton/value.hpp> + +namespace Json +{ + class Reader; +} + +namespace qpidit +{ + + class Message + { + protected: + std::ostringstream oss; + public: + Message(); + Message(const Message& e); + std::string toString() const; + operator std::string() const; + template <class T> Message& operator<<(const T& t) { oss << t; return *this; } + }; + +#define MSG(message) (::qpidit::Message() << message) + + class ArgumentError: public std::runtime_error + { + public: + explicit ArgumentError(const std::string& msg); + virtual ~ArgumentError() throw(); + }; + + class ErrnoError: public std::runtime_error + { + public: + ErrnoError(const std::string& funcName, int errorNum); + virtual ~ErrnoError(); + }; + + class IncorrectJmsMapKeyPrefixError: public std::runtime_error + { + public: + IncorrectJmsMapKeyPrefixError(const std::string& expected, const std::string& key); + virtual ~IncorrectJmsMapKeyPrefixError(); + }; + + class IncorrectMessageBodyLengthError: public std::runtime_error + { + public: + IncorrectMessageBodyLengthError(int expected, int found); + virtual ~IncorrectMessageBodyLengthError(); + }; + + class IncorrectMessageBodyTypeError: public std::runtime_error + { + public: + IncorrectMessageBodyTypeError(proton::type_id expected, proton::type_id found); // AMQP type errors + IncorrectMessageBodyTypeError(const std::string& expected, const std::string& found); // JMS message type errors + virtual ~IncorrectMessageBodyTypeError(); + }; + + class IncorrectValueTypeError: public std::runtime_error + { + public: + IncorrectValueTypeError(const proton::value& val); + virtual ~IncorrectValueTypeError(); + }; + + class InvalidJsonRootNodeError: public std::runtime_error + { + protected: + static std::map<Json::ValueType, std::string> s_JsonValueTypeNames; + public: + InvalidJsonRootNodeError(const Json::ValueType& expected, const Json::ValueType& actual); + virtual ~InvalidJsonRootNodeError(); + protected: + static std::string formatJsonValueType(const Json::ValueType& valueType); + }; + + class InvalidTestValueError: public std::runtime_error + { + public: + InvalidTestValueError(const std::string& type, const std::string& valueStr); + virtual ~InvalidTestValueError() throw(); + }; + + class JsonParserError: public std::runtime_error + { + public: + explicit JsonParserError(const Json::Reader& jsonReader); + virtual ~JsonParserError() throw(); + }; + + class PcloseError: public ErrnoError + { + public: + PcloseError(int errorNum); + virtual ~PcloseError(); + }; + + class PopenError: public ErrnoError + { + public: + PopenError(int errorNum); + virtual ~PopenError(); + }; + + class UnknownAmqpTypeError: public std::runtime_error + { + public: + explicit UnknownAmqpTypeError(const std::string& amqpType); + virtual ~UnknownAmqpTypeError() throw(); + }; + + class UnknownJmsMessageSubTypeError: public std::runtime_error + { + public: + explicit UnknownJmsMessageSubTypeError(const std::string& jmsMessageSubType); + virtual ~UnknownJmsMessageSubTypeError(); + }; + + class UnknownJmsMessageTypeError: public std::runtime_error + { + public: + explicit UnknownJmsMessageTypeError(const std::string& jmsMessageType); + virtual ~UnknownJmsMessageTypeError(); + }; + + class UnsupportedAmqpTypeError: public std::runtime_error + { + public: + explicit UnsupportedAmqpTypeError(const std::string& amqpType); + virtual ~UnsupportedAmqpTypeError() throw(); + }; + +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_QPIDITERRORS_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp new file mode 100644 index 0000000..de02c77 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp @@ -0,0 +1,279 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/shim/AmqpReceiver.hpp" + +#include <iostream> +#include <json/json.h> +#include "proton/container.hpp" +#include "qpidit/QpidItErrors.hpp" + +namespace qpidit +{ + namespace shim + { + + AmqpReceiver::AmqpReceiver(const std::string& brokerUrl, + const std::string& amqpType, + uint32_t expected) : + _brokerUrl(brokerUrl), + _amqpType(amqpType), + _expected(expected), + _received(0UL), + _receivedValueList(Json::arrayValue) + {} + + AmqpReceiver::~AmqpReceiver() {} + + Json::Value& AmqpReceiver::getReceivedValueList() { + return _receivedValueList; + } + + void AmqpReceiver::on_start(proton::event &e) { + _receiver = e.container().open_receiver(_brokerUrl); + } + + void AmqpReceiver::on_message(proton::event &e) { + proton::message& msg = e.message(); + if (!msg.id().empty() && msg.id().get<uint64_t>() < _received) return; // ignore duplicate + if (_received < _expected) { + if (_amqpType.compare("null") == 0) { + checkMessageType(msg, proton::NULL_); + _receivedValueList.append("None"); + } else if (_amqpType.compare("boolean") == 0) { + checkMessageType(msg, proton::BOOLEAN); + _receivedValueList.append(msg.body().get<proton::amqp_boolean>() ? "True": "False"); + } else if (_amqpType.compare("ubyte") == 0) { + checkMessageType(msg, proton::UBYTE); + _receivedValueList.append(toHexStr<proton::amqp_ubyte>(msg.body().get<proton::amqp_ubyte>())); + } else if (_amqpType.compare("ushort") == 0) { + checkMessageType(msg, proton::USHORT); + _receivedValueList.append(toHexStr<proton::amqp_ushort>(msg.body().get<proton::amqp_ushort>())); + } else if (_amqpType.compare("uint") == 0) { + checkMessageType(msg, proton::UINT); + _receivedValueList.append(toHexStr<proton::amqp_uint>(msg.body().get<proton::amqp_uint>())); + } else if (_amqpType.compare("ulong") == 0) { + checkMessageType(msg, proton::ULONG); + _receivedValueList.append(toHexStr<proton::amqp_ulong>(msg.body().get<proton::amqp_ulong>())); + } else if (_amqpType.compare("byte") == 0) { + checkMessageType(msg, proton::BYTE); + _receivedValueList.append(toHexStr<proton::amqp_byte>(msg.body().get<proton::amqp_byte>())); + } else if (_amqpType.compare("short") == 0) { + checkMessageType(msg, proton::SHORT); + _receivedValueList.append(toHexStr<proton::amqp_short>(msg.body().get<proton::amqp_short>())); + } else if (_amqpType.compare("int") == 0) { + checkMessageType(msg, proton::INT); + _receivedValueList.append(toHexStr<proton::amqp_int>(msg.body().get<proton::amqp_int>())); + } else if (_amqpType.compare("long") == 0) { + checkMessageType(msg, proton::LONG); + _receivedValueList.append(toHexStr<proton::amqp_long>(msg.body().get<proton::amqp_long>())); + } else if (_amqpType.compare("float") == 0) { + checkMessageType(msg, proton::FLOAT); + proton::amqp_float f = msg.body().get<proton::amqp_float>(); + _receivedValueList.append(toHexStr<uint32_t>(*((uint32_t*)&f), true)); + } else if (_amqpType.compare("double") == 0) { + checkMessageType(msg, proton::DOUBLE); + proton::amqp_double d = msg.body().get<proton::amqp_double>(); + _receivedValueList.append(toHexStr<uint64_t>(*((uint64_t*)&d), true)); + } else if (_amqpType.compare("decimal32") == 0) { + checkMessageType(msg, proton::DECIMAL32); + _receivedValueList.append(toHexStr<uint32_t>(msg.body().get<proton::amqp_decimal32>(), true)); + } else if (_amqpType.compare("decimal64") == 0) { + checkMessageType(msg, proton::DECIMAL64); + _receivedValueList.append(toHexStr<uint64_t>(msg.body().get<proton::amqp_decimal64>(), true)); + } else if (_amqpType.compare("decimal128") == 0) { + checkMessageType(msg, proton::DECIMAL128); + proton::amqp_decimal128 d128(msg.body().get<proton::amqp_decimal128>()); + _receivedValueList.append(stringToHexStr(std::string(d128.value.bytes, 16))); + } else if (_amqpType.compare("char") == 0) { + checkMessageType(msg, proton::CHAR); + wchar_t c = msg.body().get<proton::amqp_char>(); + std::stringstream oss; + if (c < 0x7f && std::iswprint(c)) { + oss << (char)c; + } else { + oss << "0x" << std::hex << c; + } + _receivedValueList.append(oss.str()); + } else if (_amqpType.compare("timestamp") == 0) { + checkMessageType(msg, proton::TIMESTAMP); + std::ostringstream oss; + oss << "0x" << std::hex << msg.body().get<proton::amqp_timestamp>().milliseconds; + _receivedValueList.append(oss.str()); + } else if (_amqpType.compare("uuid") == 0) { + checkMessageType(msg, proton::UUID); + std::ostringstream oss; + oss << msg.body().get<proton::amqp_uuid>(); + _receivedValueList.append(oss.str()); + } else if (_amqpType.compare("binary") == 0) { + checkMessageType(msg, proton::BINARY); + _receivedValueList.append(msg.body().get<proton::amqp_binary>()); + } else if (_amqpType.compare("string") == 0) { + checkMessageType(msg, proton::STRING); + _receivedValueList.append(msg.body().get<proton::amqp_string>()); + } else if (_amqpType.compare("symbol") == 0) { + checkMessageType(msg, proton::SYMBOL); + _receivedValueList.append(msg.body().get<proton::amqp_symbol>()); + } else if (_amqpType.compare("list") == 0) { + checkMessageType(msg, proton::LIST); + Json::Value jsonList(Json::arrayValue); + _receivedValueList.append(getSequence(jsonList, msg.body())); + } else if (_amqpType.compare("map") == 0) { + checkMessageType(msg, proton::MAP); + Json::Value jsonMap(Json::objectValue); + _receivedValueList.append(getMap(jsonMap, msg.body())); + } else if (_amqpType.compare("array") == 0) { + throw qpidit::UnsupportedAmqpTypeError(_amqpType); + } else { + throw qpidit::UnknownAmqpTypeError(_amqpType); + } + } + _received++; + if (_received >= _expected) { + e.receiver().close(); + e.connection().close(); + } + } + + // protected + + //static + void AmqpReceiver::checkMessageType(const proton::message& msg, proton::type_id amqpType) { + if (msg.body().type() != amqpType) { + throw qpidit::IncorrectMessageBodyTypeError(amqpType, msg.body().type()); + } + } + + //static + Json::Value& AmqpReceiver::getMap(Json::Value& jsonMap, const proton::data& dat) { + const proton::value v(dat); + return getMap(jsonMap, v); + } + + //static + Json::Value& AmqpReceiver::getMap(Json::Value& jsonMap, const proton::value& val) { + std::map<proton::value, proton::value> msgMap; + val.decoder() >> proton::to_map(msgMap); + for (std::map<proton::value, proton::value>::const_iterator i = msgMap.begin(); i != msgMap.end(); ++i) { + switch (i->second.type()) { + case proton::LIST: + { + Json::Value jsonSubList(Json::arrayValue); + jsonMap[i->first.get<std::string>()] = getSequence(jsonSubList, i->second); + break; + } + case proton::MAP: + { + Json::Value jsonSubMap(Json::objectValue); + jsonMap[i->first.get<std::string>()] = getMap(jsonSubMap, i->second); + break; + } + case proton::ARRAY: + break; + case proton::STRING: + jsonMap[i->first.get<std::string>()] = Json::Value(i->second.get<std::string>()); + break; + default: + throw qpidit::IncorrectValueTypeError(i->second); + } + } + return jsonMap; + } + + //static + Json::Value& AmqpReceiver::getSequence(Json::Value& jsonList, const proton::data& dat) { + const proton::value v(dat); + return getSequence(jsonList, v); + } + + //static + Json::Value& AmqpReceiver::getSequence(Json::Value& jsonList, const proton::value& val) { + std::vector<proton::value> msgList; + val.decoder() >> proton::to_sequence(msgList); + for (std::vector<proton::value>::const_iterator i=msgList.begin(); i!=msgList.end(); ++i) { + switch ((*i).type()) { + case proton::LIST: + { + Json::Value jsonSubList(Json::arrayValue); + jsonList.append(getSequence(jsonSubList, *i)); + break; + } + case proton::MAP: + { + Json::Value jsonSubMap(Json::objectValue); + jsonList.append(getMap(jsonSubMap, *i)); + break; + } + case proton::ARRAY: + break; + case proton::STRING: + jsonList.append(Json::Value((*i).get<std::string>())); + break; + default: + throw qpidit::IncorrectValueTypeError(*i); + } + } + return jsonList; + } + + //static + std::string AmqpReceiver::stringToHexStr(const std::string& str) { + std::ostringstream oss; + oss << "0x" << std::hex; + for (std::string::const_iterator i=str.begin(); i!=str.end(); ++i) { + oss << std::setw(2) << std::setfill('0') << ((int)*i & 0xff); + } + return oss.str(); + } + + } /* namespace shim */ +} /* namespace qpidit */ + + +/* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: AMQP type + * 4: Expected number of test values to receive + */ + +int main(int argc, char** argv) { + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + qpidit::shim::AmqpReceiver receiver(oss.str(), argv[3], std::stoul(argv[4])); + proton::container(receiver).run(); + + std::cout << argv[3] << std::endl; + std::cout << receiver.getReceivedValueList(); + } catch (const std::exception& e) { + std::cerr << "AmqpReceiver error: " << e.what() << std::endl; + exit(-1); + } + exit(0); +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp new file mode 100644 index 0000000..0e5e5e8 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_SHIM_AMQP_RECEIVER_HPP_ +#define SRC_QPIDIT_SHIM_AMQP_RECEIVER_HPP_ + +#include <iomanip> +#include <json/value.h> +#include "proton/messaging_handler.hpp" +#include <sstream> + +namespace qpidit +{ + namespace shim + { + + class AmqpReceiver : public proton::messaging_handler + { + protected: + const std::string _brokerUrl; + const std::string _amqpType; + proton::receiver _receiver; + uint32_t _expected; + uint32_t _received; + Json::Value _receivedValueList; + public: + AmqpReceiver(const std::string& brokerUrl, const std::string& amqpType, uint32_t exptected); + virtual ~AmqpReceiver(); + Json::Value& getReceivedValueList(); + void on_start(proton::event &e); + void on_message(proton::event &e); + protected: + static void checkMessageType(const proton::message& msg, proton::type_id msgType); + static Json::Value& getMap(Json::Value& jsonMap, const proton::data& datl); + static Json::Value& getMap(Json::Value& jsonMap, const proton::value& val); + static Json::Value& getSequence(Json::Value& jsonList, const proton::data& dat); + static Json::Value& getSequence(Json::Value& jsonList, const proton::value& val); + static std::string stringToHexStr(const std::string& str); + + // Format signed numbers in negative hex format, ie -0xNNNN, positive numbers in 0xNNNN format + template<typename T> static std::string toHexStr(T val, bool fillFlag = false) { + std::ostringstream oss; + bool neg = val < 0; + if (neg) val = -val; + oss << (neg ? "-" : "") << "0x" << std::hex; + if (fillFlag) { + oss << std::setw(sizeof(T)*2) << std::setfill('0'); + } + oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val); + return oss.str(); + } + }; + + } /* namespace shim */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_SHIM_AMQP_RECEIVER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp new file mode 100644 index 0000000..1d07126 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp @@ -0,0 +1,329 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/shim/AmqpSender.hpp" + +#include <iomanip> +#include <iostream> +#include <json/json.h> +#include "proton/container.hpp" + +namespace qpidit +{ + namespace shim + { + + AmqpSender::AmqpSender(const std::string& brokerUrl, + const std::string& amqpType, + const Json::Value& testValues) : + _brokerUrl(brokerUrl), + _amqpType(amqpType), + _testValues(testValues), + _msgsSent(0), + _msgsConfirmed(0), + _totalMsgs(testValues.size()) + {} + + AmqpSender::~AmqpSender() {} + + void AmqpSender::on_start(proton::event &e) { + e.container().open_sender(_brokerUrl); + } + + void AmqpSender::on_sendable(proton::event &e) { + if (_totalMsgs == 0) { + e.sender().connection().close(); + } else if (_msgsSent == 0) { + for (Json::Value::const_iterator i=_testValues.begin(); i!=_testValues.end(); ++i) { + if (e.sender().credit()) { + proton::message msg; + e.sender().send(setMessage(msg, *i)); + _msgsSent++; + } + } + } + } + + void AmqpSender::on_accepted(proton::event &e) { + _msgsConfirmed++; + if (_msgsConfirmed == _totalMsgs) { + e.connection().close(); + } + } + + void AmqpSender::on_disconnected(proton::event &e) { + _msgsSent = _msgsConfirmed; + } + + // protected + + proton::message& AmqpSender::setMessage(proton::message& msg, const Json::Value& testValue) { + msg.id(_msgsSent + 1); + if (_amqpType.compare("null") == 0) { + std::string testValueStr(testValue.asString()); + if (testValueStr.compare("None") != 0) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + msg.body(proton::amqp_null()); + } else if (_amqpType.compare("boolean") == 0) { + std::string testValueStr(testValue.asString()); + if (testValueStr.compare("True") == 0) { + msg.body(proton::amqp_boolean(true)); + } else if (testValueStr.compare("False") == 0) { + msg.body(proton::amqp_boolean(false)); + } else { + throw qpidit::InvalidTestValueError(_amqpType, testValueStr); + } + } else if (_amqpType.compare("ubyte") == 0) { + setIntegralValue<proton::amqp_ubyte>(msg, testValue.asString(), true); + } else if (_amqpType.compare("ushort") == 0) { + setIntegralValue<proton::amqp_ushort>(msg, testValue.asString(), true); + } else if (_amqpType.compare("uint") == 0) { + setIntegralValue<proton::amqp_uint>(msg, testValue.asString(), true); + } else if (_amqpType.compare("ulong") == 0) { + setIntegralValue<proton::amqp_ulong>(msg, testValue.asString(), true); + } else if (_amqpType.compare("byte") == 0) { + setIntegralValue<proton::amqp_byte>(msg, testValue.asString(), false); + } else if (_amqpType.compare("short") == 0) { + setIntegralValue<proton::amqp_short>(msg, testValue.asString(), false); + } else if (_amqpType.compare("int") == 0) { + setIntegralValue<proton::amqp_int>(msg, testValue.asString(), false); + } else if (_amqpType.compare("long") == 0) { + setIntegralValue<proton::amqp_long>(msg, testValue.asString(), false); + } else if (_amqpType.compare("float") == 0) { + setFloatValue<proton::amqp_float, uint32_t>(msg, testValue.asString()); + } else if (_amqpType.compare("double") == 0) { + setFloatValue<proton::amqp_double, uint64_t>(msg, testValue.asString()); + } else if (_amqpType.compare("decimal32") == 0) { + proton::amqp_decimal32 val; + val.value = std::stoul(testValue.asString(), nullptr, 16); + msg.body(val); + } else if (_amqpType.compare("decimal64") == 0) { + proton::amqp_decimal64 val; + val.value = std::stoul(testValue.asString(), nullptr, 16); + msg.body(val); + } else if (_amqpType.compare("decimal128") == 0) { + std::string testValueStr(testValue.asString()); + if (testValueStr.size() != 34) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + + const std::string s1 = testValueStr.substr(2, 16); + uint64_t p1 = std::stoul(s1, nullptr, 16); + const std::string s2 = testValueStr.substr(18, 16); + uint64_t p2 = std::stoul(s2, nullptr, 16); + + proton::amqp_decimal128 val; + uint64ToChar16((char*)val.value.bytes, p1, p2); + msg.body(val); + } else if (_amqpType.compare("char") == 0) { + std::string charStr = testValue.asString(); + wchar_t val; + if (charStr.size() == 1) { // Single char "a" + val = charStr[0]; + } else if (charStr.size() >= 3 && charStr.size() <= 10) { // Format "0xN" through "0xNNNNNNNN" + val = std::stoul(charStr, nullptr, 16); + } else { + //TODO throw format error + } + msg.body(proton::amqp_char(val)); + } else if (_amqpType.compare("timestamp") == 0) { + proton::amqp_timestamp val; + val.milliseconds = std::stoul(testValue.asString(), nullptr, 16); + msg.body(val); + } else if (_amqpType.compare("uuid") == 0) { + std::string testValueStr(testValue.asString()); + if (testValueStr.size() != 36) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + // Expected format: "00000000-0000-0000-0000-000000000000" + std::ostringstream oss1; + oss1 << testValueStr.substr(0, 8) << testValueStr.substr(9, 4) << testValueStr.substr(14, 4); + uint64_t p1 = std::stoul(oss1.str(), nullptr, 16); + std::ostringstream oss2; + oss2 << testValueStr.substr(19, 4) << testValueStr.substr(24); + uint64_t p2 = std::stoul(oss2.str(), nullptr, 16); + + proton::amqp_uuid val; + uint64ToChar16((char*)val.value.bytes, p1, p2); + msg.body(val); + } else if (_amqpType.compare("binary") == 0) { + setStringValue<proton::amqp_binary>(msg, testValue.asString()); + } else if (_amqpType.compare("string") == 0) { + setStringValue<proton::amqp_string>(msg, testValue.asString()); + } else if (_amqpType.compare("symbol") == 0) { + setStringValue<proton::amqp_symbol>(msg, testValue.asString()); + } else if (_amqpType.compare("list") == 0) { + std::vector<proton::value> list; + processList(list, testValue); + msg.body(proton::as<proton::LIST>(list)); + } else if (_amqpType.compare("map") == 0) { + std::map<std::string, proton::value> map; + processMap(map, testValue); + msg.body(proton::as<proton::MAP>(map)); + } else if (_amqpType.compare("array") == 0) { +/* + std::vector<proton::value> array; + processArray(array, testValue); + msg.body(proton::as<proton::ARRAY>(array)); +*/ + throw qpidit::UnsupportedAmqpTypeError(_amqpType); + } else { + throw qpidit::UnknownAmqpTypeError(_amqpType); + } + return msg; + } + + //static + std::string AmqpSender::bytearrayToHexStr(const char* src, int len) { + std::ostringstream oss; + oss << "0x" << std::hex; + for (int i=0; i<len; ++i) { + oss << std::setw(2) << std::setfill('0') << ((int)src[i] & 0xff); + } + return oss.str(); + } + + //static + proton::value AmqpSender::extractProtonValue(const Json::Value& val) { + switch (val.type()) { + case Json::nullValue: + return proton::amqp_null(); + case Json::intValue: + return proton::amqp_int(val.asInt()); + case Json::uintValue: + return proton::amqp_uint(val.asUInt()); + case Json::realValue: + return proton::amqp_double(val.asDouble()); + case Json::stringValue: + return proton::amqp_string(val.asString()); + case Json::booleanValue: + return proton::amqp_boolean(val.asBool()); + default:; + } + } + +// //static +// Json::Value::ValueType getArrayType(const Json::Value& val) { +// if (val.size()) > 0) { +// return val[0].type(); +// } else { +// return Json::Value::nullValue; // TODO: find a way to represent empty array +// } +// } + + //static + void AmqpSender::processArray(std::vector<proton::value>& array, const Json::Value& testValues) { + for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) { + if ((*i).isArray()) { + std::vector<proton::value> subArray; + processArray(subArray, *i); + array.push_back(subArray); + } else if ((*i).isObject()) { + std::map<std::string, proton::value> subMap; + processMap(subMap, *i); + array.push_back(subMap); + } else { + array.push_back(*i); + } + } + } + + //static + void AmqpSender::processList(std::vector<proton::value>& list, const Json::Value& testValues) { + for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) { + if ((*i).isArray()) { + std::vector<proton::value> subList; + processList(subList, *i); + list.push_back(subList); + } else if ((*i).isObject()) { + std::map<std::string, proton::value> subMap; + processMap(subMap, *i); + list.push_back(subMap); + } else { + list.push_back(extractProtonValue(*i)); + } + } + //std::cout << std::endl; + } + + //static + void AmqpSender::processMap(std::map<std::string, proton::value>& map, const Json::Value& testValues) { + Json::Value::Members keys = testValues.getMemberNames(); + for (std::vector<std::string>::const_iterator i=keys.begin(); i!=keys.end(); ++i) { + Json::Value mapVal = testValues[*i]; + if (mapVal.isArray()) { + std::vector<proton::value> subList; + processList(subList, mapVal); + map[*i] = subList; + } else if (mapVal.isObject()) { + std::map<std::string, proton::value> subMap; + processMap(subMap, mapVal); + map[*i] = subMap; + } else { + map[*i] = extractProtonValue(mapVal); + } + } + } + + //static + void AmqpSender::revMemcpy(char* dest, const char* src, int n) { + for (int i = 0; i < n; ++i) { + *(dest + i) = *(src + n - i - 1); + } + } + + //static + void AmqpSender::uint64ToChar16(char* dest, uint64_t upper, uint64_t lower) { + revMemcpy(dest, (const char*)&upper, sizeof(uint64_t)); + revMemcpy(dest + 8, (const char*)&lower, sizeof(uint64_t)); + } + + } /* namespace shim */ +} /* namespace qpidit */ + + +/* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: AMQP type + * 4: Test value(s) as JSON string + */ + +int main(int argc, char** argv) { + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + Json::Value testValues; + Json::Reader jsonReader; + if (not jsonReader.parse(argv[4], testValues, false)) { + throw qpidit::JsonParserError(jsonReader); + } + + qpidit::shim::AmqpSender sender(oss.str(), argv[3], testValues); + proton::container(sender).run(); + } catch (const std::exception& e) { + std::cerr << "AmqpSender error: " << e.what() << std::endl; + exit(1); + } + exit(0); +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp new file mode 100644 index 0000000..96ed391 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_SHIM_AMQPSENDER_HPP_ +#define SRC_QPIDIT_SHIM_AMQPSENDER_HPP_ + +#include <json/value.h> +#include "proton/messaging_handler.hpp" +#include "qpidit/QpidItErrors.hpp" + +namespace qpidit +{ + namespace shim + { + + class AmqpSender : public proton::messaging_handler + { + protected: + const std::string _brokerUrl; + const std::string _amqpType; + const Json::Value _testValues; + uint32_t _msgsSent; + uint32_t _msgsConfirmed; + uint32_t _totalMsgs; + public: + AmqpSender(const std::string& brokerUrl, const std::string& amqpType, const Json::Value& testValues); + virtual ~AmqpSender(); + void on_start(proton::event &e); + void on_sendable(proton::event &e); + void on_accepted(proton::event &e); + void on_disconnected(proton::event &e); + protected: + proton::message& setMessage(proton::message& msg, const Json::Value& testValue); + + static std::string bytearrayToHexStr(const char* src, int len); + static void revMemcpy(char* dest, const char* src, int n); + static void uint64ToChar16(char* dest, uint64_t upper, uint64_t lower); + + static proton::value extractProtonValue(const Json::Value& val); + //static Json::Value::ValueType getArrayType(const Json::Value& val); + static void processArray(std::vector<proton::value>& array, const Json::Value& testValues); + static void processList(std::vector<proton::value>& list, const Json::Value& testValues); + static void processMap(std::map<std::string, proton::value>& map, const Json::Value& testValues); + + // Set message body to floating type T through integral type U + // Used to convert a hex string representation of a float or double to a float or double + template<typename T, typename U> void setFloatValue(proton::message& msg, const std::string& testValueStr) { + try { + U ival(std::stoul(testValueStr, nullptr, 16)); + msg.body(T(*reinterpret_cast<T*>(&ival))); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + } + + template<typename T> void setIntegralValue(proton::message& msg, const std::string& testValueStr, bool unsignedVal) { + try { + T val(unsignedVal ? std::stoul(testValueStr, nullptr, 16) : std::stol(testValueStr, nullptr, 16)); + msg.body(val); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + } + + template<typename T> void setStringValue(proton::message& msg, const std::string& testValueStr) { + try { + T val(testValueStr); + msg.body(val); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } + } + }; + + } /* namespace shim */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_SHIM_AMQPSENDER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp new file mode 100644 index 0000000..8cb86ba --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp @@ -0,0 +1,311 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/shim/JmsReceiver.hpp" + +#include <iostream> +#include <json/json.h> +#include <map> +#include "proton/container.hpp" +#include "qpidit/QpidItErrors.hpp" + +namespace qpidit +{ + namespace shim + { + typedef enum {JMS_MESSAGE_TYPE=0, JMS_OBJECTMESSAGE_TYPE, JMS_MAPMESSAGE_TYPE, JMS_BYTESMESSAGE_TYPE, JMS_STREAMMESSAGE_TYPE, JMS_TEXTMESSAGE_TYPE} jmsMessageType_t; + //static + proton::amqp_symbol JmsReceiver::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type"); + std::map<std::string, proton::amqp_byte>JmsReceiver::s_jmsMessageTypeAnnotationValues = { + {"JMS_MESSAGE_TYPE", JMS_MESSAGE_TYPE}, + {"JMS_OBJECTMESSAGE_TYPE", JMS_OBJECTMESSAGE_TYPE}, + {"JMS_MAPMESSAGE_TYPE", JMS_MAPMESSAGE_TYPE}, + {"JMS_BYTESMESSAGE_TYPE", JMS_BYTESMESSAGE_TYPE}, + {"JMS_STREAMMESSAGE_TYPE", JMS_STREAMMESSAGE_TYPE}, + {"JMS_TEXTMESSAGE_TYPE", JMS_TEXTMESSAGE_TYPE}}; + + + JmsReceiver::JmsReceiver(const std::string& brokerUrl, + const std::string& jmsMessageType, + const Json::Value& testNumberMap): + _brokerUrl(brokerUrl), + _jmsMessageType(jmsMessageType), + _testNumberMap(testNumberMap), + _receiver(), + _subTypeList(testNumberMap.getMemberNames()), + _subTypeIndex(0), + _expected(getTotalNumExpectedMsgs(testNumberMap)), + _received(0UL), + _receivedSubTypeList(Json::arrayValue), + _receivedValueMap(Json::objectValue) + {} + + JmsReceiver::~JmsReceiver() {} + + Json::Value& JmsReceiver::getReceivedValueMap() { + return _receivedValueMap; + } + + void JmsReceiver::on_start(proton::event &e) { + _receiver = e.container().open_receiver(_brokerUrl); + } + + void JmsReceiver::on_message(proton::event &e) { + proton::message& msg = e.message(); + if (_received < _expected) { + switch (msg.annotation(proton::amqp_symbol("x-opt-jms-msg-type")).get<proton::amqp_byte>()) { + case JMS_MESSAGE_TYPE: + receiveJmsMessage(msg); + break; + case JMS_OBJECTMESSAGE_TYPE: + receiveJmsObjectMessage(msg); + break; + case JMS_MAPMESSAGE_TYPE: + receiveJmsMapMessage(msg); + break; + case JMS_BYTESMESSAGE_TYPE: + receiveJmsBytesMessage(msg); + break; + case JMS_STREAMMESSAGE_TYPE: + receiveJmsStreamMessage(msg); + break; + case JMS_TEXTMESSAGE_TYPE: + receiveJmsTextMessage(msg); + break; + default:; + // TODO: handle error - no known JMS message type + } + std::string subType(_subTypeList[_subTypeIndex]); + // Increment the subtype if the required number of messages have been received + if (_receivedSubTypeList.size() >= _testNumberMap[subType].asInt() && + _subTypeIndex < _testNumberMap.size()) { + _receivedValueMap[subType] = _receivedSubTypeList; + _receivedSubTypeList.clear(); + ++_subTypeIndex; + } + _received++; + if (_received >= _expected) { + _receiver.close(); + e.connection().close(); + } + } + } + + //static + uint32_t JmsReceiver::getTotalNumExpectedMsgs(const Json::Value testNumberMap) { + uint32_t total(0UL); + for (Json::Value::const_iterator i=testNumberMap.begin(); i!=testNumberMap.end(); ++i) { + total += (*i).asUInt(); + } + return total; + + } + + // protected + + void JmsReceiver::receiveJmsMessage(const proton::message& msg) { + // TODO: use this format for testing message JMS properties + } + + void JmsReceiver::receiveJmsObjectMessage(const proton::message& msg) { + // TODO + } + + void JmsReceiver::receiveJmsMapMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_MAPMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + std::map<std::string, proton::value> m; + msg.body().get(m); + for (std::map<std::string, proton::value>::const_iterator i=m.begin(); i!=m.end(); ++i) { + std::string key = i->first; + if (subType.compare(key.substr(0, key.size()-3)) != 0) { + throw qpidit::IncorrectJmsMapKeyPrefixError(subType, key); + } + proton::value val = i->second; + if (subType.compare("boolean") == 0) { + _receivedSubTypeList.append(val.get<proton::amqp_boolean>() ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val.get<proton::amqp_byte>()))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(val.get<proton::amqp_binary>())); + } else if (subType.compare("char") == 0) { + std::ostringstream oss; + oss << (char)val.get<proton::amqp_char>(); + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + proton::amqp_double d = val.get<proton::amqp_double>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); + } else if (subType.compare("float") == 0) { + proton::amqp_float f = val.get<proton::amqp_float>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); + } else if (subType.compare("int") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val.get<proton::amqp_int>()))); + } else if (subType.compare("long") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val.get<proton::amqp_long>()))); + } else if (subType.compare("short") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val.get<proton::amqp_short>()))); + } else if (subType.compare("string") == 0) { + _receivedSubTypeList.append(Json::Value(val.get<proton::amqp_string>())); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + } + + void JmsReceiver::receiveJmsBytesMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_BYTESMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + proton::amqp_binary body = msg.body().get<proton::amqp_binary>(); + if (subType.compare("boolean") == 0) { + if (body.size() != 1) throw IncorrectMessageBodyLengthError(1, body.size()); + _receivedSubTypeList.append(body[0] ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + if (body.size() != sizeof(int8_t)) throw IncorrectMessageBodyLengthError(sizeof(int8_t), body.size()); + int8_t val = *((int8_t*)body.data()); + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(body)); + } else if (subType.compare("char") == 0) { + if (body.size() != sizeof(char16_t)) throw IncorrectMessageBodyLengthError(sizeof(char16_t), body.size()); + // TODO: This is ugly: ignoring first byte - handle UTF-16 correctly + char c = body[1]; + std::ostringstream oss; + oss << c; + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError(sizeof(int64_t), body.size()); + int64_t val = be64toh(*((int64_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val, true, false))); + } else if (subType.compare("float") == 0) { + if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError(sizeof(int32_t), body.size()); + int32_t val = be32toh(*((int32_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val, true, false))); + } else if (subType.compare("long") == 0) { + if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError(sizeof(int64_t), body.size()); + int64_t val = be64toh(*((int64_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val))); + } else if (subType.compare("int") == 0) { + if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError(sizeof(int32_t), body.size()); + int32_t val = be32toh(*((int32_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val))); + } else if (subType.compare("short") == 0) { + if (body.size() != sizeof(int16_t)) throw IncorrectMessageBodyLengthError(sizeof(int16_t), body.size()); + int16_t val = be16toh(*((int16_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val))); + } else if (subType.compare("string") == 0) { + // TODO: decode string size in first two bytes and check string size + _receivedSubTypeList.append(Json::Value(body.substr(2))); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + + void JmsReceiver::receiveJmsStreamMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_STREAMMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + std::vector<proton::value> l; + msg.body().get(l); + for (std::vector<proton::value>::const_iterator i=l.begin(); i!=l.end(); ++i) { + if (subType.compare("boolean") == 0) { + _receivedSubTypeList.append(i->get<proton::amqp_boolean>() ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<proton::amqp_byte>(i->get<proton::amqp_byte>()))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(i->get<proton::amqp_binary>())); + } else if (subType.compare("char") == 0) { + std::ostringstream oss; + oss << (char)i->get<proton::amqp_char>(); + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + proton::amqp_double d = i->get<proton::amqp_double>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); + } else if (subType.compare("float") == 0) { + proton::amqp_float f = i->get<proton::amqp_float>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); + } else if (subType.compare("int") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<proton::amqp_int>(i->get<proton::amqp_int>()))); + } else if (subType.compare("long") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<proton::amqp_long>(i->get<proton::amqp_long>()))); + } else if (subType.compare("short") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<proton::amqp_short>(i->get<proton::amqp_short>()))); + } else if (subType.compare("string") == 0) { + _receivedSubTypeList.append(Json::Value(i->get<proton::amqp_string>())); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + + } + + void JmsReceiver::receiveJmsTextMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_TEXTMESSAGE_TYPE"); + } + _receivedSubTypeList.append(Json::Value(msg.body().get<proton::amqp_string>())); + } + + } /* namespace shim */ +} /* namespace qpidit */ + +/* --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: JMS message type + * 4: JSON string of map containing number of test values to receive for each type/subtype + */ +int main(int argc, char** argv) { + /* + for (int i=0; i<argc; ++i) { + std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl; + } + */ + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + +// try { + Json::Value testNumberMap; + Json::Reader jsonReader; + if (not jsonReader.parse(argv[4], testNumberMap, false)) { + throw qpidit::JsonParserError(jsonReader); + } + + qpidit::shim::JmsReceiver receiver(oss.str(), argv[3], testNumberMap); + proton::container(receiver).run(); + + std::cout << argv[3] << std::endl; + std::cout << receiver.getReceivedValueMap(); +// } catch (const std::exception& e) { +// std::cerr << "JmsReceiver error: " << e.what() << std::endl; +// exit(1); +// } + exit(0); +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/5f7ac58b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp new file mode 100644 index 0000000..b713882 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp @@ -0,0 +1,88 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_ +#define SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_ + +#include <iomanip> +#include <json/value.h> +#include "proton/messaging_handler.hpp" +#include <sstream> + +namespace qpidit +{ + namespace shim + { + + class JmsReceiver : public proton::messaging_handler + { + protected: + static proton::amqp_symbol s_jmsMessageTypeAnnotationKey; + static std::map<std::string, proton::amqp_byte>s_jmsMessageTypeAnnotationValues; + + const std::string _brokerUrl; + const std::string _jmsMessageType; + const Json::Value _testNumberMap; + proton::receiver _receiver; + Json::Value::Members _subTypeList; + int _subTypeIndex; + uint32_t _expected; + uint32_t _received; + Json::Value _receivedSubTypeList; + Json::Value _receivedValueMap; + public: + JmsReceiver(const std::string& brokerUrl, const std::string& jmsMessageType, const Json::Value& testNumberMap); + virtual ~JmsReceiver(); + Json::Value& getReceivedValueMap(); + void on_start(proton::event &e); + void on_message(proton::event &e); + + static uint32_t getTotalNumExpectedMsgs(const Json::Value testNumberMap); + + protected: + void receiveJmsMessage(const proton::message& msg); + void receiveJmsObjectMessage(const proton::message& msg); + void receiveJmsMapMessage(const proton::message& msg); + void receiveJmsBytesMessage(const proton::message& msg); + void receiveJmsStreamMessage(const proton::message& msg); + void receiveJmsTextMessage(const proton::message& msg); + + // Format signed numbers in negative hex format if signedFlag is true, ie -0xNNNN, positive numbers in 0xNNNN format + template<typename T> static std::string toHexStr(T val, bool fillFlag = false, bool signedFlag = true) { + std::ostringstream oss; + bool neg = false; + if (signedFlag) { + neg = val < 0; + if (neg) val = -val; + } + oss << (neg ? "-" : "") << "0x" << std::hex; + if (fillFlag) { + oss << std::setw(sizeof(T)*2) << std::setfill('0'); + } + oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val); + return oss.str(); + } + }; + + } /* namespace shim */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_ */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
