http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp new file mode 100644 index 0000000..dbe2c0d --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp @@ -0,0 +1,487 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/jms_messages_test/Sender.hpp" + +#include <cerrno> +#include <iomanip> +#include <iostream> +#include <json/json.h> +#include "proton/connection.hpp" +#include "proton/default_container.hpp" +#include "proton/tracker.hpp" +#include "proton/transport.hpp" +#include <stdio.h> + +namespace qpidit +{ + namespace jms_messages_test + { + //static + proton::symbol Sender::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type"); + std::map<std::string, int8_t>Sender::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap(); + + Sender::Sender(const std::string& brokerUrl, + const std::string& jmsMessageType, + const Json::Value& testParams) : + _brokerUrl(brokerUrl), + _jmsMessageType(jmsMessageType), + _testValueMap(testParams[0]), + _testHeadersMap(testParams[1]), + _testPropertiesMap(testParams[2]), + _msgsSent(0), + _msgsConfirmed(0), + _totalMsgs(getTotalNumMessages(_testValueMap)) + { + if (_testValueMap.type() != Json::objectValue) { + throw qpidit::InvalidJsonRootNodeError(Json::objectValue, _testValueMap.type()); + } + } + + Sender::~Sender() {} + + void Sender::on_container_start(proton::container &c) { + c.open_sender(_brokerUrl); + } + + void Sender::on_sendable(proton::sender &s) { + if (_totalMsgs == 0) { + s.connection().close(); + } else if (_msgsSent == 0) { + Json::Value::Members subTypes = _testValueMap.getMemberNames(); + std::sort(subTypes.begin(), subTypes.end()); + for (std::vector<std::string>::const_iterator i=subTypes.begin(); i!=subTypes.end(); ++i) { + sendMessages(s, *i, _testValueMap[*i]); + } + } + } + + void Sender::on_tracker_accept(proton::tracker &t) { + _msgsConfirmed++; + if (_msgsConfirmed == _totalMsgs) { + t.connection().close(); + } + } + + void Sender::on_transport_close(proton::transport &t) { + _msgsSent = _msgsConfirmed; + } + + void Sender::on_connection_error(proton::connection &c) { + std::cerr << "JmsSender::on_connection_error(): " << c.error() << std::endl; + } + + void Sender::on_sender_error(proton::sender &s) { + std::cerr << "JmsSender::on_sender_error(): " << s.error() << std::endl; + } + + void Sender::on_session_error(proton::session &s) { + std::cerr << "JmsSender::on_session_error(): " << s.error() << std::endl; + } + + void Sender::on_transport_error(proton::transport &t) { + std::cerr << "JmsSender::on_transport_error(): " << t.error() << std::endl; + } + + void Sender::on_error(const proton::error_condition &ec) { + std::cerr << "JmsSender::on_error(): " << ec << std::endl; + } + + // protected + + void Sender::sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValues) { + uint32_t valueNumber = 0; + for (Json::Value::const_iterator i=testValues.begin(); i!=testValues.end(); ++i) { + if (s.credit()) { + proton::message msg; + if (_jmsMessageType.compare("JMS_MESSAGE_TYPE") == 0) { + setMessage(msg, subType, (*i).asString()); + } else if (_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") == 0) { + setBytesMessage(msg, subType, (*i).asString()); + } else if (_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") == 0) { + setMapMessage(msg, subType, (*i).asString(), valueNumber); + } else if (_jmsMessageType.compare("JMS_OBJECTMESSAGE_TYPE") == 0) { + setObjectMessage(msg, subType, *i); + } else if (_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") == 0) { + setStreamMessage(msg, subType, (*i).asString()); + } else if (_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") == 0) { + setTextMessage(msg, *i); + } else { + throw qpidit::UnknownJmsMessageTypeError(_jmsMessageType); + } + addMessageHeaders(msg); + addMessageProperties(msg); + s.send(msg); + _msgsSent += 1; + valueNumber += 1; + } + } + + } + + proton::message& Sender::setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { + if (subType.compare("none") != 0) { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + if (testValueStr.size() != 0) { + throw InvalidTestValueError(subType, testValueStr); + } + msg.content_type(proton::symbol("application/octet-stream")); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { + proton::binary bin; + if (subType.compare("boolean") == 0) { + if (testValueStr.compare("False") == 0) bin.push_back(char(0)); + else if (testValueStr.compare("True") == 0) bin.push_back(char(1)); + else throw InvalidTestValueError(subType, testValueStr); + } else if (subType.compare("byte") == 0) { + uint8_t val = getIntegralValue<int8_t>(testValueStr); + bin.push_back(char(val)); + } else if (subType.compare("bytes") == 0) { + bin.assign(testValueStr.begin(), testValueStr.end()); + } else if (subType.compare("char") == 0) { + bin.push_back(char(0)); + if (testValueStr[0] == '\\') { // Format: '\xNN' + bin.push_back(getIntegralValue<char>(testValueStr.substr(2))); + } else { // Format: 'c' + bin.push_back(testValueStr[0]); + } + } else if (subType.compare("double") == 0) { + uint64_t val; + try { + val = htobe64(std::strtoul(testValueStr.data(), NULL, 16)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("double", testValueStr); } + numToBinary(val, bin); + //for (int i=0; i<sizeof(val); ++i) { + // bin.push_back(* ((char*)&val + i)); + // } + } else if (subType.compare("float") == 0) { + uint32_t val; + try { + val = htobe32((uint32_t)std::strtoul(testValueStr.data(), NULL, 16)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("float", testValueStr); } + numToBinary(val, bin); + //for (int i=0; i<sizeof(val); ++i) { + // bin.push_back(* ((char*)&val + i)); + //} + } else if (subType.compare("long") == 0) { + uint64_t val = htobe64(getIntegralValue<uint64_t>(testValueStr)); + numToBinary(val, bin); + //bin.assign(sizeof(val), val); + } else if (subType.compare("int") == 0) { + uint32_t val = htobe32(getIntegralValue<uint32_t>(testValueStr)); + numToBinary(val, bin); + //bin.assign(sizeof(val), val); + } else if (subType.compare("short") == 0) { + uint16_t val = htobe16(getIntegralValue<int16_t>(testValueStr)); + numToBinary(val, bin); + //bin.assign(sizeof(val), val); + } else if (subType.compare("string") == 0) { + std::ostringstream oss; + uint16_t strlen = htobe16((uint16_t)testValueStr.size()); + oss.write((char*)&strlen, sizeof(strlen)); + oss << testValueStr; + std::string os = oss.str(); + bin.assign(os.begin(), os.end()); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + msg.body(bin); + msg.inferred(true); + msg.content_type(proton::symbol("application/octet-stream")); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_BYTESMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber) { + std::ostringstream oss; + oss << subType << std::setw(3) << std::setfill('0') << valueNumber; + std::string mapKey(oss.str()); + std::map<std::string, proton::value> m; + if (subType.compare("boolean") == 0) { + if (testValueStr.compare("False") == 0) m[mapKey] = false; + else if (testValueStr.compare("True") == 0) m[mapKey] = true; + else throw InvalidTestValueError(subType, testValueStr); + } else if (subType.compare("byte") == 0) { + m[mapKey] = int8_t(getIntegralValue<int8_t>(testValueStr)); + } else if (subType.compare("bytes") == 0) { + m[mapKey] = proton::binary(testValueStr); + } else if (subType.compare("char") == 0) { + wchar_t val; + if (testValueStr[0] == '\\') { // Format: '\xNN' + val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2)); + } else { // Format: 'c' + val = testValueStr[0]; + } + m[mapKey] = val; + } else if (subType.compare("double") == 0) { + m[mapKey] = getFloatValue<double, uint64_t>(testValueStr); + } else if (subType.compare("float") == 0) { + m[mapKey] = getFloatValue<float, uint32_t>(testValueStr); + } else if (subType.compare("int") == 0) { + m[mapKey] = getIntegralValue<int32_t>(testValueStr); + } else if (subType.compare("long") == 0) { + m[mapKey] = getIntegralValue<int64_t>(testValueStr); + } else if (subType.compare("short") == 0) { + m[mapKey] = getIntegralValue<int16_t>(testValueStr); + } else if (subType.compare("string") == 0) { + m[mapKey] = testValueStr; + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + msg.inferred(false); + msg.body(m); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MAPMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue) { + msg.body(getJavaObjectBinary(subType, testValue.asString())); + msg.inferred(true); + msg.content_type(proton::symbol("application/x-java-serialized-object")); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_OBJECTMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { + std::vector<proton::value> l; + if (subType.compare("boolean") == 0) { + if (testValueStr.compare("False") == 0) l.push_back(false); + else if (testValueStr.compare("True") == 0) l.push_back(true); + else throw InvalidTestValueError(subType, testValueStr); + } else if (subType.compare("byte") == 0) { + l.push_back(int8_t(getIntegralValue<int8_t>(testValueStr))); + } else if (subType.compare("bytes") == 0) { + l.push_back(proton::binary(testValueStr)); + } else if (subType.compare("char") == 0) { + wchar_t val; + if (testValueStr[0] == '\\') { // Format: '\xNN' + val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2)); + } else { // Format: 'c' + val = testValueStr[0]; + } + l.push_back(val); + } else if (subType.compare("double") == 0) { + l.push_back(getFloatValue<double, uint64_t>(testValueStr)); + } else if (subType.compare("float") == 0) { + l.push_back(getFloatValue<float, uint32_t>(testValueStr)); + } else if (subType.compare("int") == 0) { + l.push_back(getIntegralValue<int32_t>(testValueStr)); + } else if (subType.compare("long") == 0) { + l.push_back(getIntegralValue<int64_t>(testValueStr)); + } else if (subType.compare("short") == 0) { + l.push_back(getIntegralValue<int16_t>(testValueStr)); + } else if (subType.compare("string") == 0) { + l.push_back(testValueStr); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + msg.body(l); + msg.inferred(true); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_STREAMMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setTextMessage(proton::message& msg, const Json::Value& testValue) { + msg.body(testValue.asString()); + msg.inferred(false); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_TEXTMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::addMessageHeaders(proton::message& msg) { + Json::Value::Members headerNames = _testHeadersMap.getMemberNames(); + for (std::vector<std::string>::const_iterator i=headerNames.begin(); i!=headerNames.end(); ++i) { + const Json::Value _subMap = _testHeadersMap[*i]; + const std::string headerValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map + std::string val = _subMap[headerValueType].asString(); + if (i->compare("JMS_TYPE_HEADER") == 0) { + setJmsTypeHeader(msg, val); + } else if (i->compare("JMS_CORRELATIONID_HEADER") == 0) { + if (headerValueType.compare("bytes") == 0) { + setJmsCorrelationId(msg, proton::binary(val)); + } else { + setJmsCorrelationId(msg, val); + } + } else if (i->compare("JMS_REPLYTO_HEADER") == 0) { + setJmsReplyTo(msg, headerValueType, val); + } else { + throw qpidit::UnknownJmsHeaderTypeError(*i); + } + } + return msg; + } + + //static + proton::message& Sender::setJmsTypeHeader(proton::message& msg, const std::string& t) { + msg.subject(t); + return msg; + } + + //static + proton::message& Sender::setJmsCorrelationId(proton::message& msg, const std::string& cid) { + proton::message_id mid(cid); + msg.correlation_id(mid); + msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); + return msg; + } + + //static + proton::message& Sender::setJmsCorrelationId(proton::message& msg, const proton::binary cid) { + proton::message_id mid(cid); + msg.correlation_id(cid); + msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); + return msg; + } + + //static + proton::message& Sender::setJmsReplyTo(proton::message& msg, const std::string& dts, const std::string& d) { + if (dts.compare("queue") == 0) { + msg.reply_to(/*std::string("queue://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_QUEUE)); + } else if (dts.compare("temp_queue") == 0) { + msg.reply_to(/*std::string("queue://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TMEP_QUEUE)); + } else if (dts.compare("topic") == 0) { + msg.reply_to(/*std::string("topic://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TOPIC)); + } else if (dts.compare("temp_topic") == 0) { + msg.reply_to(/*std::string("topic://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TEMP_TOPIC)); + } else { + throw qpidit::UnknownJmsDestinationTypeError(dts); + } + return msg; + } + + proton::message& Sender::addMessageProperties(proton::message& msg) { + Json::Value::Members propertyNames = _testPropertiesMap.getMemberNames(); + for (std::vector<std::string>::const_iterator i=propertyNames.begin(); i!=propertyNames.end(); ++i) { + const Json::Value _subMap = _testPropertiesMap[*i]; + const std::string propertyValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map + std::string val = _subMap[propertyValueType].asString(); + if (propertyValueType.compare("boolean") == 0) { + if (val.compare("False") == 0) setMessageProperty(msg, *i, false); + else if (val.compare("True") == 0) setMessageProperty(msg, *i, true); + else throw InvalidTestValueError(propertyValueType, val); + } else if (propertyValueType.compare("byte") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int8_t>(val)); + } else if (propertyValueType.compare("double") == 0) { + setMessageProperty(msg, *i, getFloatValue<double, uint64_t>(val)); + } else if (propertyValueType.compare("float") == 0) { + setMessageProperty(msg, *i, getFloatValue<float, uint64_t>(val)); + } else if (propertyValueType.compare("int") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int32_t>(val)); + } else if (propertyValueType.compare("long") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int64_t>(val)); + } else if (propertyValueType.compare("short") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int16_t>(val)); + } else if (propertyValueType.compare("string") == 0) { + setMessageProperty(msg, *i, val); + } else { + throw qpidit::UnknownJmsPropertyTypeError(propertyValueType); + } + } + return msg; + } + + //static + proton::binary Sender::getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString) { + proton::binary javaObjectBinary; + char buf[1024]; + int bytesRead; + FILE* fp = ::popen("java -cp target/JavaObjUtils.jar org.apache.qpid.interop_test.obj_util.JavaObjToBytes javaClassStr", "rb"); + if (fp == NULL) { throw qpidit::PopenError(errno); } + do { + bytesRead = ::fread(buf, 1, sizeof(buf), fp); + javaObjectBinary.insert(javaObjectBinary.end(), &buf[0], &buf[bytesRead-1]); + } while (bytesRead == sizeof(buf)); + int status = ::pclose(fp); + if (status == -1) { + throw qpidit::PcloseError(errno); + } + return javaObjectBinary; + } + + // static + uint32_t Sender::getTotalNumMessages(const Json::Value& testValueMap) { + uint32_t tot = 0; + for (Json::Value::const_iterator i = testValueMap.begin(); i != testValueMap.end(); ++i) { + tot += (*i).size(); + } + return tot; + } + + //static + std::map<std::string, int8_t> Sender::initializeJmsMessageTypeAnnotationMap() { + std::map<std::string, int8_t> m; + m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE; + m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE; + m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE; + m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE; + m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE; + m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE; + return m; + } + + } /* namespace jms_messages_test */ +} /* namespace qpidit */ + + + +/* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: AMQP type + * 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap] + */ + +int main(int argc, char** argv) { +/* + for (int i=0; i<argc; ++i) { + std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl; + } +*/ + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + Json::Value testParams; + Json::Reader jsonReader; + if (not jsonReader.parse(argv[4], testParams, false)) { + throw qpidit::JsonParserError(jsonReader); + } + + qpidit::jms_messages_test::Sender sender(oss.str(), argv[3], testParams); + proton::default_container(sender).run(); + } catch (const std::exception& e) { + std::cout << "JmsSender error: " << e.what() << std::endl; + } +}
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp new file mode 100644 index 0000000..5e41120 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp @@ -0,0 +1,120 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_JMS_MESSAGES_TEST_SENDER_HPP_ +#define SRC_QPIDIT_JMS_MESSAGES_TEST_SENDER_HPP_ + +#include "json/value.h" +#include "proton/message.hpp" +#include "proton/messaging_handler.hpp" +#include "qpidit/QpidItErrors.hpp" +#include "qpidit/jms_messages_test/JmsDefinitions.hpp" +#include <typeinfo> + +namespace proton { + class message; +} + +namespace qpidit +{ + namespace jms_messages_test + { + + class Sender : public proton::messaging_handler + { + protected: + static proton::symbol s_jmsMessageTypeAnnotationKey; + static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues; + + const std::string _brokerUrl; + const std::string _jmsMessageType; + const Json::Value _testValueMap; + const Json::Value _testHeadersMap; + const Json::Value _testPropertiesMap; + uint32_t _msgsSent; + uint32_t _msgsConfirmed; + uint32_t _totalMsgs; + public: + Sender(const std::string& brokerUrl, const std::string& jmsMessageType, const Json::Value& testParams); + virtual ~Sender(); + + void on_container_start(proton::container &c); + void on_sendable(proton::sender &s); + void on_tracker_accept(proton::tracker &t); + void on_transport_close(proton::transport &t); + + void on_connection_error(proton::connection &c); + void on_session_error(proton::session &s); + void on_sender_error(proton::sender& s); + void on_transport_error(proton::transport &t); + void on_error(const proton::error_condition &c); + protected: + void sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValueMap); + proton::message& setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr); + proton::message& setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr); + proton::message& setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber); + proton::message& setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue); + proton::message& setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValue); + proton::message& setTextMessage(proton::message& msg, const Json::Value& testValue); + + proton::message& addMessageHeaders(proton::message& msg); + static proton::message& setJmsTypeHeader(proton::message& msg, const std::string& t); + static proton::message& setJmsCorrelationId(proton::message& msg, const std::string& cid); + static proton::message& setJmsCorrelationId(proton::message& msg, const proton::binary cid); + static proton::message& setJmsReplyTo(proton::message& msg, const std::string& dt, const std::string& d); + + proton::message& addMessageProperties(proton::message& msg); + template<typename T> proton::message& setMessageProperty(proton::message& msg, const std::string& propertyName, T val) { + msg.properties().put(propertyName, val); + return msg; + } + + static proton::binary getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString); + static uint32_t getTotalNumMessages(const Json::Value& testValueMap); + + static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap(); + + template<typename T> static T numToBinary(T n, proton::binary& b) { + for (int i=0; i<sizeof(n); ++i) { + b.push_back(* ((char*)&n + i)); + } + } + + // Set message body to floating type T through integral type U + // Used to convert a hex string representation of a float or double to a float or double + template<typename T, typename U> T getFloatValue(const std::string& testValueStr) { + try { + U ival(std::strtoul(testValueStr.data(), NULL, 16)); + return T(*reinterpret_cast<T*>(&ival)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); } + } + + template<typename T> T getIntegralValue(const std::string& testValueStr) { + try { + return T(std::strtol(testValueStr.data(), NULL, 16)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); } + } + }; + + } /* namespace jms_messages_test */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_JMS_MESSAGES_TEST_SENDER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp deleted file mode 100644 index e50ffb6..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.cpp +++ /dev/null @@ -1,290 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpidit/shim/AmqpReceiver.hpp" - -#include <iostream> -#include <json/json.h> -#include "proton/connection.hpp" -#include <proton/default_container.hpp> -#include "proton/delivery.hpp" -#include "proton/receiver.hpp" -#include "proton/transport.hpp" -#include "qpidit/QpidItErrors.hpp" - -namespace qpidit -{ - namespace shim - { - - AmqpReceiver::AmqpReceiver(const std::string& brokerUrl, - const std::string& amqpType, - uint32_t expected) : - _brokerUrl(brokerUrl), - _amqpType(amqpType), - _expected(expected), - _received(0UL), - _receivedValueList(Json::arrayValue) - {} - - AmqpReceiver::~AmqpReceiver() {} - - Json::Value& AmqpReceiver::getReceivedValueList() { - return _receivedValueList; - } - - void AmqpReceiver::on_container_start(proton::container &c) { - c.open_receiver(_brokerUrl); - } - - void AmqpReceiver::on_message(proton::delivery &d, proton::message &m) { - if (proton::get<uint64_t>(m.id()) < _received) return; // ignore duplicate - if (_received < _expected) { - if (_amqpType.compare("null") == 0) { - checkMessageType(m, proton::NULL_TYPE); - _receivedValueList.append("None"); - } else if (_amqpType.compare("boolean") == 0) { - checkMessageType(m, proton::BOOLEAN); - _receivedValueList.append(m.body().get<bool>() ? "True": "False"); - } else if (_amqpType.compare("ubyte") == 0) { - checkMessageType(m, proton::UBYTE); - _receivedValueList.append(toHexStr<uint8_t>(m.body().get<uint8_t>())); - } else if (_amqpType.compare("ushort") == 0) { - checkMessageType(m, proton::USHORT); - _receivedValueList.append(toHexStr<uint16_t>(m.body().get<uint16_t>())); - } else if (_amqpType.compare("uint") == 0) { - checkMessageType(m, proton::UINT); - _receivedValueList.append(toHexStr<uint32_t>(m.body().get<uint32_t>())); - } else if (_amqpType.compare("ulong") == 0) { - checkMessageType(m, proton::ULONG); - _receivedValueList.append(toHexStr<uint64_t>(m.body().get<uint64_t>())); - } else if (_amqpType.compare("byte") == 0) { - checkMessageType(m, proton::BYTE); - _receivedValueList.append(toHexStr<int8_t>(m.body().get<int8_t>())); - } else if (_amqpType.compare("short") == 0) { - checkMessageType(m, proton::SHORT); - _receivedValueList.append(toHexStr<int16_t>(m.body().get<int16_t>())); - } else if (_amqpType.compare("int") == 0) { - checkMessageType(m, proton::INT); - _receivedValueList.append(toHexStr<int32_t>(m.body().get<int32_t>())); - } else if (_amqpType.compare("long") == 0) { - checkMessageType(m, proton::LONG); - _receivedValueList.append(toHexStr<int64_t>(m.body().get<int64_t>())); - } else if (_amqpType.compare("float") == 0) { - checkMessageType(m, proton::FLOAT); - float f = m.body().get<float>(); - _receivedValueList.append(toHexStr<uint32_t>(*((uint32_t*)&f), true)); - } else if (_amqpType.compare("double") == 0) { - checkMessageType(m, proton::DOUBLE); - double d = m.body().get<double>(); - _receivedValueList.append(toHexStr<uint64_t>(*((uint64_t*)&d), true)); - } else if (_amqpType.compare("decimal32") == 0) { - checkMessageType(m, proton::DECIMAL32); - _receivedValueList.append(byteArrayToHexStr(m.body().get<proton::decimal32>())); - } else if (_amqpType.compare("decimal64") == 0) { - checkMessageType(m, proton::DECIMAL64); - _receivedValueList.append(byteArrayToHexStr(m.body().get<proton::decimal64>())); - } else if (_amqpType.compare("decimal128") == 0) { - checkMessageType(m, proton::DECIMAL128); - _receivedValueList.append(byteArrayToHexStr(m.body().get<proton::decimal128>())); - } else if (_amqpType.compare("char") == 0) { - checkMessageType(m, proton::CHAR); - wchar_t c = m.body().get<wchar_t>(); - std::stringstream oss; - if (c < 0x7f && std::iswprint(c)) { - oss << (char)c; - } else { - oss << "0x" << std::hex << c; - } - _receivedValueList.append(oss.str()); - } else if (_amqpType.compare("timestamp") == 0) { - checkMessageType(m, proton::TIMESTAMP); - std::ostringstream oss; - oss << "0x" << std::hex << m.body().get<proton::timestamp>().milliseconds(); - _receivedValueList.append(oss.str()); - } else if (_amqpType.compare("uuid") == 0) { - checkMessageType(m, proton::UUID); - std::ostringstream oss; - oss << m.body().get<proton::uuid>(); - _receivedValueList.append(oss.str()); - } else if (_amqpType.compare("binary") == 0) { - checkMessageType(m, proton::BINARY); - _receivedValueList.append(std::string(m.body().get<proton::binary>())); - } else if (_amqpType.compare("string") == 0) { - checkMessageType(m, proton::STRING); - _receivedValueList.append(m.body().get<std::string>()); - } else if (_amqpType.compare("symbol") == 0) { - checkMessageType(m, proton::SYMBOL); - _receivedValueList.append(m.body().get<proton::symbol>()); - } else if (_amqpType.compare("list") == 0) { - checkMessageType(m, proton::LIST); - Json::Value jsonList(Json::arrayValue); - _receivedValueList.append(getSequence(jsonList, m.body())); - } else if (_amqpType.compare("map") == 0) { - checkMessageType(m, proton::MAP); - Json::Value jsonMap(Json::objectValue); - _receivedValueList.append(getMap(jsonMap, m.body())); - } else if (_amqpType.compare("array") == 0) { - throw qpidit::UnsupportedAmqpTypeError(_amqpType); - } else { - throw qpidit::UnknownAmqpTypeError(_amqpType); - } - } - _received++; - if (_received >= _expected) { - d.receiver().close(); - d.connection().close(); - } - } - - void AmqpReceiver::on_connection_error(proton::connection &c) { - std::cerr << "AmqpReceiver::on_connection_error(): " << c.error() << std::endl; - } - - void AmqpReceiver::on_receiver_error(proton::receiver& r) { - std::cerr << "AmqpReceiver::on_receiver_error(): " << r.error() << std::endl; - } - - void AmqpReceiver::on_session_error(proton::session &s) { - std::cerr << "AmqpReceiver::on_session_error(): " << s.error() << std::endl; - } - - void AmqpReceiver::on_transport_error(proton::transport &t) { - std::cerr << "AmqpReceiver::on_transport_error(): " << t.error() << std::endl; - } - - void AmqpReceiver::on_error(const proton::error_condition &ec) { - std::cerr << "AmqpReceiver::on_error(): " << ec << std::endl; - } - - // protected - - //static - void AmqpReceiver::checkMessageType(const proton::message& msg, proton::type_id amqpType) { - if (msg.body().type() != amqpType) { - throw qpidit::IncorrectMessageBodyTypeError(amqpType, msg.body().type()); - } - } - - //static - Json::Value& AmqpReceiver::getMap(Json::Value& jsonMap, const proton::value& val) { - std::map<proton::value, proton::value> msgMap; - val.get(msgMap); - for (std::map<proton::value, proton::value>::const_iterator i = msgMap.begin(); i != msgMap.end(); ++i) { - switch (i->second.type()) { - case proton::LIST: - { - Json::Value jsonSubList(Json::arrayValue); - jsonMap[i->first.get<std::string>()] = getSequence(jsonSubList, i->second); - break; - } - case proton::MAP: - { - Json::Value jsonSubMap(Json::objectValue); - jsonMap[i->first.get<std::string>()] = getMap(jsonSubMap, i->second); - break; - } - case proton::ARRAY: - break; - case proton::STRING: - jsonMap[i->first.get<std::string>()] = Json::Value(i->second.get<std::string>()); - break; - default: - throw qpidit::IncorrectValueTypeError(i->second); - } - } - return jsonMap; - } - - //static - Json::Value& AmqpReceiver::getSequence(Json::Value& jsonList, const proton::value& val) { - std::vector<proton::value> msgList; - val.get(msgList); - for (std::vector<proton::value>::const_iterator i=msgList.begin(); i!=msgList.end(); ++i) { - switch ((*i).type()) { - case proton::LIST: - { - Json::Value jsonSubList(Json::arrayValue); - jsonList.append(getSequence(jsonSubList, *i)); - break; - } - case proton::MAP: - { - Json::Value jsonSubMap(Json::objectValue); - jsonList.append(getMap(jsonSubMap, *i)); - break; - } - case proton::ARRAY: - break; - case proton::STRING: - jsonList.append(Json::Value((*i).get<std::string>())); - break; - default: - throw qpidit::IncorrectValueTypeError(*i); - } - } - return jsonList; - } - - //static - std::string AmqpReceiver::stringToHexStr(const std::string& str) { - std::ostringstream oss; - oss << "0x" << std::hex; - for (std::string::const_iterator i=str.begin(); i!=str.end(); ++i) { - oss << std::setw(2) << std::setfill('0') << ((int)*i & 0xff); - } - return oss.str(); - } - - } /* namespace shim */ -} /* namespace qpidit */ - - -/* - * --- main --- - * Args: 1: Broker address (ip-addr:port) - * 2: Queue name - * 3: AMQP type - * 4: Expected number of test values to receive - */ - -int main(int argc, char** argv) { - // TODO: improve arg management a little... - if (argc != 5) { - throw qpidit::ArgumentError("Incorrect number of arguments"); - } - - std::ostringstream oss; - oss << argv[1] << "/" << argv[2]; - - try { - qpidit::shim::AmqpReceiver receiver(oss.str(), argv[3], std::strtoul(argv[4], NULL, 0)); - proton::default_container(receiver).run(); - - std::cout << argv[3] << std::endl; - Json::FastWriter fw; - std::cout << fw.write(receiver.getReceivedValueList()); - } catch (const std::exception& e) { - std::cerr << "AmqpReceiver error: " << e.what() << std::endl; - exit(-1); - } - exit(0); -} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp deleted file mode 100644 index fdd5287..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpReceiver.hpp +++ /dev/null @@ -1,85 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef SRC_QPIDIT_SHIM_AMQP_RECEIVER_HPP_ -#define SRC_QPIDIT_SHIM_AMQP_RECEIVER_HPP_ - -#include <iomanip> -#include <json/value.h> -#include "proton/messaging_handler.hpp" -#include "proton/types.hpp" -#include <sstream> - -namespace qpidit -{ - namespace shim - { - - class AmqpReceiver : public proton::messaging_handler - { - protected: - const std::string _brokerUrl; - const std::string _amqpType; - uint32_t _expected; - uint32_t _received; - Json::Value _receivedValueList; - public: - AmqpReceiver(const std::string& brokerUrl, const std::string& amqpType, uint32_t exptected); - virtual ~AmqpReceiver(); - Json::Value& getReceivedValueList(); - void on_container_start(proton::container &c); - void on_message(proton::delivery &d, proton::message &m); - - void on_connection_error(proton::connection &c); - void on_receiver_error(proton::receiver& r); - void on_session_error(proton::session &s); - void on_transport_error(proton::transport &t); - void on_error(const proton::error_condition &c); - protected: - static void checkMessageType(const proton::message& msg, proton::type_id msgType); - static Json::Value& getMap(Json::Value& jsonMap, const proton::value& val); - static Json::Value& getSequence(Json::Value& jsonList, const proton::value& val); - static std::string stringToHexStr(const std::string& str); - - // Format signed numbers in negative hex format, ie -0xNNNN, positive numbers in 0xNNNN format - template<typename T> static std::string toHexStr(T val, bool fillFlag = false) { - std::ostringstream oss; - bool neg = val < 0; - if (neg) val = -val; - oss << (neg ? "-" : "") << "0x" << std::hex; - if (fillFlag) { - oss << std::setw(sizeof(T)*2) << std::setfill('0'); - } - oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val); - return oss.str(); - } - - template<size_t N> static std::string byteArrayToHexStr(const proton::byte_array<N>& val) { - std::ostringstream oss; - oss << val; - return oss.str(); - } - }; - - } /* namespace shim */ -} /* namespace qpidit */ - -#endif /* SRC_QPIDIT_SHIM_AMQP_RECEIVER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp deleted file mode 100644 index a81e235..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.cpp +++ /dev/null @@ -1,368 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpidit/shim/AmqpSender.hpp" - -#include <iostream> -#include <json/json.h> -#include "proton/connection.hpp" -#include "proton/default_container.hpp" -#include "proton/sender.hpp" -#include "proton/tracker.hpp" -#include "proton/transport.hpp" - -namespace qpidit -{ - namespace shim - { - - AmqpSender::AmqpSender(const std::string& brokerUrl, - const std::string& amqpType, - const Json::Value& testValues) : - _brokerUrl(brokerUrl), - _amqpType(amqpType), - _testValues(testValues), - _msgsSent(0), - _msgsConfirmed(0), - _totalMsgs(testValues.size()) - {} - - AmqpSender::~AmqpSender() {} - - void AmqpSender::on_container_start(proton::container &c) { - c.open_sender(_brokerUrl); - } - - void AmqpSender::on_sendable(proton::sender &s) { - if (_totalMsgs == 0) { - s.connection().close(); - } else if (_msgsSent == 0) { - for (Json::Value::const_iterator i=_testValues.begin(); i!=_testValues.end(); ++i) { - if (s.credit()) { - proton::message msg; - s.send(setMessage(msg, *i)); - _msgsSent++; - } - } - } else { - // do nothing - } - } - - void AmqpSender::on_tracker_accept(proton::tracker &t) { - _msgsConfirmed++; - if (_msgsConfirmed == _totalMsgs) { - t.connection().close(); - } - } - - void AmqpSender::on_transport_close(proton::transport &t) { - _msgsSent = _msgsConfirmed; - } - - void AmqpSender::on_connection_error(proton::connection &c) { - std::cerr << "AmqpSender::on_connection_error(): " << c.error() << std::endl; - } - - void AmqpSender::on_sender_error(proton::sender &s) { - std::cerr << "AmqpSender::on_sender_error(): " << s.error() << std::endl; - } - - void AmqpSender::on_session_error(proton::session &s) { - std::cerr << "AmqpSender::on_session_error(): " << s.error() << std::endl; - } - - void AmqpSender::on_transport_error(proton::transport &t) { - std::cerr << "AmqpSender::on_transport_error(): " << t.error() << std::endl; - } - - void AmqpSender::on_error(const proton::error_condition &ec) { - std::cerr << "AmqpSender::on_error(): " << ec << std::endl; - } - - // protected - - proton::message& AmqpSender::setMessage(proton::message& msg, const Json::Value& testValue) { - msg.id(_msgsSent + 1); - if (_amqpType.compare("null") == 0) { - std::string testValueStr(testValue.asString()); - if (testValueStr.compare("None") != 0) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } - proton::value v; - msg.body(v); - } else if (_amqpType.compare("boolean") == 0) { - std::string testValueStr(testValue.asString()); - if (testValueStr.compare("True") == 0) { - msg.body(true); - } else if (testValueStr.compare("False") == 0) { - msg.body(false); - } else { - throw qpidit::InvalidTestValueError(_amqpType, testValueStr); - } - } else if (_amqpType.compare("ubyte") == 0) { - setIntegralValue<uint8_t>(msg, testValue.asString(), true); - } else if (_amqpType.compare("ushort") == 0) { - setIntegralValue<uint16_t>(msg, testValue.asString(), true); - } else if (_amqpType.compare("uint") == 0) { - setIntegralValue<uint32_t>(msg, testValue.asString(), true); - } else if (_amqpType.compare("ulong") == 0) { - setIntegralValue<uint64_t>(msg, testValue.asString(), true); - } else if (_amqpType.compare("byte") == 0) { - setIntegralValue<int8_t>(msg, testValue.asString(), false); - } else if (_amqpType.compare("short") == 0) { - setIntegralValue<int16_t>(msg, testValue.asString(), false); - } else if (_amqpType.compare("int") == 0) { - setIntegralValue<int32_t>(msg, testValue.asString(), false); - } else if (_amqpType.compare("long") == 0) { - setIntegralValue<int64_t>(msg, testValue.asString(), false); - } else if (_amqpType.compare("float") == 0) { - setFloatValue<float, uint32_t>(msg, testValue.asString()); - } else if (_amqpType.compare("double") == 0) { - setFloatValue<double, uint64_t>(msg, testValue.asString()); - } else if (_amqpType.compare("decimal32") == 0) { - proton::decimal32 val; - hexStringToBytearray(val, testValue.asString().substr(2)); - msg.body(val); - } else if (_amqpType.compare("decimal64") == 0) { - proton::decimal64 val; - hexStringToBytearray(val, testValue.asString().substr(2)); - msg.body(val); - } else if (_amqpType.compare("decimal128") == 0) { - proton::decimal128 val; - hexStringToBytearray(val, testValue.asString().substr(2)); - msg.body(val); - } else if (_amqpType.compare("char") == 0) { - std::string charStr = testValue.asString(); - wchar_t val; - if (charStr.size() == 1) { // Single char "a" - val = charStr[0]; - } else if (charStr.size() >= 3 && charStr.size() <= 10) { // Format "0xN" through "0xNNNNNNNN" - val = std::strtoul(charStr.data(), NULL, 16); - } else { - //TODO throw format error - } - msg.body(val); - } else if (_amqpType.compare("timestamp") == 0) { - proton::timestamp val(std::strtoul(testValue.asString().data(), NULL, 16)); - msg.body(val); - } else if (_amqpType.compare("uuid") == 0) { - proton::uuid val; - std::string uuidStr(testValue.asString()); - // Expected format: "00000000-0000-0000-0000-000000000000" - // ^ ^ ^ ^ ^ - // start index -> 0 9 14 19 24 - hexStringToBytearray(val, uuidStr.substr(0, 8), 0, 4); - hexStringToBytearray(val, uuidStr.substr(9, 4), 4, 2); - hexStringToBytearray(val, uuidStr.substr(14, 4), 6, 2); - hexStringToBytearray(val, uuidStr.substr(19, 4), 8, 2); - hexStringToBytearray(val, uuidStr.substr(24, 12), 10, 6); - msg.body(val); - } else if (_amqpType.compare("binary") == 0) { - //setStringValue<proton::amqp_binary>(msg, testValue.asString()); - proton::binary val(testValue.asString()); - msg.body(val); - } else if (_amqpType.compare("string") == 0) { - //setStringValue<proton::amqp_string>(msg, testValue.asString()); - std::string val(testValue.asString()); - msg.body(val); - } else if (_amqpType.compare("symbol") == 0) { - //setStringValue<proton::amqp_symbol>(msg, testValue.asString()); - proton::symbol val(testValue.asString()); - msg.body(val); - } else if (_amqpType.compare("list") == 0) { - std::vector<proton::value> list; - processList(list, testValue); - msg.body(list); - } else if (_amqpType.compare("map") == 0) { - std::map<std::string, proton::value> map; - processMap(map, testValue); - msg.body(map); - } else if (_amqpType.compare("array") == 0) { -/* - std::vector<proton::value> array; - processArray(array, testValue); - msg.body(proton::as<proton::ARRAY>(array)); -*/ - throw qpidit::UnsupportedAmqpTypeError(_amqpType); - } else { - throw qpidit::UnknownAmqpTypeError(_amqpType); - } - return msg; - } - - //static - std::string AmqpSender::bytearrayToHexStr(const char* src, int len) { - std::ostringstream oss; - oss << "0x" << std::hex; - for (int i=0; i<len; ++i) { - oss << std::setw(2) << std::setfill('0') << ((int)src[i] & 0xff); - } - return oss.str(); - } - - //static - proton::value AmqpSender::extractProtonValue(const Json::Value& val) { - switch (val.type()) { - case Json::nullValue: - { - proton::value v; //proton::null n; - return v; //proton::value(n); - } - case Json::intValue: - return proton::value(val.asInt()); - case Json::uintValue: - return proton::value(val.asUInt()); - case Json::realValue: - return proton::value(val.asDouble()); - case Json::stringValue: - return proton::value(val.asString()); - case Json::booleanValue: - return proton::value(val.asBool()); - default:; - } - } - -// //static -// Json::Value::ValueType getArrayType(const Json::Value& val) { -// if (val.size()) > 0) { -// return val[0].type(); -// } else { -// return Json::Value::nullValue; // TODO: find a way to represent empty array -// } -// } - - //static - void AmqpSender::processArray(std::vector<proton::value>& array, const Json::Value& testValues) { - for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) { - if ((*i).isArray()) { - std::vector<proton::value> subArray; - processArray(subArray, *i); - array.push_back(proton::value(subArray)); - } else if ((*i).isObject()) { - std::map<std::string, proton::value> subMap; - processMap(subMap, *i); - array.push_back(proton::value(subMap)); - } else { - proton::value v; - if ((*i).isNull()) - ; - else if ((*i).isBool()) - v = (*i).asBool(); - else if ((*i).isInt()) - v = (*i).asInt(); - else if ((*i).isUInt()) - v = (*i).asUInt(); - else if ((*i).isDouble()) - v = (*i).asDouble(); - else if ((*i).isString()) - v = (*i).asString(); - else - ; // TODO handle this case - array.push_back(v); - } - } - } - - //static - void AmqpSender::processList(std::vector<proton::value>& list, const Json::Value& testValues) { - for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) { - if ((*i).isArray()) { - std::vector<proton::value> subList; - processList(subList, *i); - list.push_back(proton::value(subList)); - } else if ((*i).isObject()) { - std::map<std::string, proton::value> subMap; - processMap(subMap, *i); - list.push_back(proton::value(subMap)); - } else { - list.push_back(extractProtonValue(*i)); - } - } - //std::cout << std::endl; - } - - //static - void AmqpSender::processMap(std::map<std::string, proton::value>& map, const Json::Value& testValues) { - Json::Value::Members keys = testValues.getMemberNames(); - for (std::vector<std::string>::const_iterator i=keys.begin(); i!=keys.end(); ++i) { - Json::Value mapVal = testValues[*i]; - if (mapVal.isArray()) { - std::vector<proton::value> subList; - processList(subList, mapVal); - map[*i] = subList; - } else if (mapVal.isObject()) { - std::map<std::string, proton::value> subMap; - processMap(subMap, mapVal); - map[*i] = subMap; - } else { - map[*i] = extractProtonValue(mapVal); - } - } - } - - //static - void AmqpSender::revMemcpy(char* dest, const char* src, int n) { - for (int i = 0; i < n; ++i) { - *(dest + i) = *(src + n - i - 1); - } - } - - //static - void AmqpSender::uint64ToChar16(char* dest, uint64_t upper, uint64_t lower) { - revMemcpy(dest, (const char*)&upper, sizeof(uint64_t)); - revMemcpy(dest + 8, (const char*)&lower, sizeof(uint64_t)); - } - - } /* namespace shim */ -} /* namespace qpidit */ - - -/* - * --- main --- - * Args: 1: Broker address (ip-addr:port) - * 2: Queue name - * 3: AMQP type - * 4: Test value(s) as JSON string - */ - -int main(int argc, char** argv) { - // TODO: improve arg management a little... - if (argc != 5) { - throw qpidit::ArgumentError("Incorrect number of arguments"); - } - - std::ostringstream oss; - oss << argv[1] << "/" << argv[2]; - - try { - Json::Value testValues; - Json::Reader jsonReader; - if (not jsonReader.parse(argv[4], testValues, false)) { - throw qpidit::JsonParserError(jsonReader); - } - - qpidit::shim::AmqpSender sender(oss.str(), argv[3], testValues); - proton::default_container(sender).run(); - } catch (const std::exception& e) { - std::cerr << "AmqpSender error: " << e.what() << std::endl; - exit(1); - } - exit(0); -} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp deleted file mode 100644 index 8ad4e52..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/AmqpSender.hpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef SRC_QPIDIT_SHIM_AMQPSENDER_HPP_ -#define SRC_QPIDIT_SHIM_AMQPSENDER_HPP_ - -#include <iomanip> -#include <json/value.h> -#include "proton/messaging_handler.hpp" -#include "proton/message.hpp" -#include "qpidit/QpidItErrors.hpp" - -namespace qpidit -{ - namespace shim - { - - class AmqpSender : public proton::messaging_handler - { - protected: - const std::string _brokerUrl; - const std::string _amqpType; - const Json::Value _testValues; - uint32_t _msgsSent; - uint32_t _msgsConfirmed; - uint32_t _totalMsgs; - public: - AmqpSender(const std::string& brokerUrl, const std::string& amqpType, const Json::Value& testValues); - virtual ~AmqpSender(); - void on_container_start(proton::container &c); - void on_sendable(proton::sender &s); - void on_tracker_accept(proton::tracker &t); - void on_transport_close(proton::transport &t); - - void on_connection_error(proton::connection &c); - void on_session_error(proton::session &s); - void on_sender_error(proton::sender& s); - void on_transport_error(proton::transport &t); - void on_error(const proton::error_condition &c); - protected: - proton::message& setMessage(proton::message& msg, const Json::Value& testValue); - - static std::string bytearrayToHexStr(const char* src, int len); - static void revMemcpy(char* dest, const char* src, int n); - static void uint64ToChar16(char* dest, uint64_t upper, uint64_t lower); - - static proton::value extractProtonValue(const Json::Value& val); - //static Json::Value::ValueType getArrayType(const Json::Value& val); - static void processArray(std::vector<proton::value>& array, const Json::Value& testValues); - static void processList(std::vector<proton::value>& list, const Json::Value& testValues); - static void processMap(std::map<std::string, proton::value>& map, const Json::Value& testValues); - - template<size_t N> static void hexStringToBytearray(proton::byte_array<N>& ba, const std::string s, size_t fromArrayIndex = 0, size_t arrayLen = N) { - for (size_t i=0; i<arrayLen; ++i) { - ba[fromArrayIndex + i] = (char)std::strtoul(s.substr(2*i, 2).c_str(), NULL, 16); - } - } - - // Set message body to floating type T through integral type U - // Used to convert a hex string representation of a float or double to a float or double - template<typename T, typename U> void setFloatValue(proton::message& msg, const std::string& testValueStr) { - try { - U ival(std::strtoul(testValueStr.data(), NULL, 16)); - msg.body(T(*reinterpret_cast<T*>(&ival))); - } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } - } - - template<typename T> void setIntegralValue(proton::message& msg, const std::string& testValueStr, bool unsignedVal) { - try { - T val(unsignedVal ? std::strtoul(testValueStr.data(), NULL, 16) : std::strtol(testValueStr.data(), NULL, 16)); - msg.body(val); - } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } - } - - template<typename T> void setStringValue(proton::message& msg, const std::string& testValueStr) { - try { - T val(testValueStr); - msg.body(val); - } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(_amqpType, testValueStr); } - } - }; - - } /* namespace shim */ -} /* namespace qpidit */ - -#endif /* SRC_QPIDIT_SHIM_AMQPSENDER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsDefinitions.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsDefinitions.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsDefinitions.hpp deleted file mode 100644 index 0a1f8cd..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsDefinitions.hpp +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef SRC_QPIDIT_SHIM_JMSDEFINTIONS_HPP_ -#define SRC_QPIDIT_SHIM_JMSDEFINTIONS_HPP_ - -namespace qpidit -{ - namespace shim - { - typedef enum {JMS_QUEUE = 0, - JMS_TOPIC, - JMS_TMEP_QUEUE, - JMS_TEMP_TOPIC} - jmsDestinationType_t; - - typedef enum {JMS_MESSAGE_TYPE=0, - JMS_OBJECTMESSAGE_TYPE, - JMS_MAPMESSAGE_TYPE, - JMS_BYTESMESSAGE_TYPE, - JMS_STREAMMESSAGE_TYPE, - JMS_TEXTMESSAGE_TYPE} - jmsMessageType_t; - - } -} - -#endif /* SRC_QPIDIT_SHIM_JMSDEFINTIONS_HPP_ */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
