http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp deleted file mode 100644 index 8cbe515..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.cpp +++ /dev/null @@ -1,432 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpidit/shim/JmsReceiver.hpp" - -#include <iostream> -#include <json/json.h> -#include "proton/connection.hpp" -#include "proton/default_container.hpp" -#include "proton/delivery.hpp" -#include "proton/transport.hpp" -#include "qpidit/QpidItErrors.hpp" - -namespace qpidit -{ - namespace shim - { - //static - proton::symbol JmsReceiver::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type"); - std::map<std::string, int8_t>JmsReceiver::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap(); - - - JmsReceiver::JmsReceiver(const std::string& brokerUrl, - const std::string& jmsMessageType, - const Json::Value& testNumberMap, - const Json::Value& flagMap): - _brokerUrl(brokerUrl), - _jmsMessageType(jmsMessageType), - _testNumberMap(testNumberMap), - _flagMap(flagMap), - _subTypeList(testNumberMap.getMemberNames()), - _subTypeIndex(0), - _expected(getTotalNumExpectedMsgs(testNumberMap)), - _received(0UL), - _receivedSubTypeList(Json::arrayValue), - _receivedValueMap(Json::objectValue), - _receivedHeadersMap(Json::objectValue), - _receivedPropertiesMap(Json::objectValue) - {} - - JmsReceiver::~JmsReceiver() {} - - Json::Value& JmsReceiver::getReceivedValueMap() { - return _receivedValueMap; - } - - Json::Value& JmsReceiver::getReceivedHeadersMap() { - return _receivedHeadersMap; - } - - Json::Value& JmsReceiver::getReceivedPropertiesMap() { - return _receivedPropertiesMap; - } - - void JmsReceiver::on_container_start(proton::container &c) { - c.open_receiver(_brokerUrl); - } - - void JmsReceiver::on_message(proton::delivery &d, proton::message &m) { - try { - if (_received < _expected) { - int8_t t = JMS_MESSAGE_TYPE; - try {t = m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")).get<int8_t>();} - catch (const std::exception& e) { - std::cout << "JmsReceiver::on_message(): Missing annotation \"x-opt-jms-msg-type\"" << std::endl; - throw; - } - switch (t) { - case JMS_MESSAGE_TYPE: - receiveJmsMessage(m); - break; - case JMS_OBJECTMESSAGE_TYPE: - receiveJmsObjectMessage(m); - break; - case JMS_MAPMESSAGE_TYPE: - receiveJmsMapMessage(m); - break; - case JMS_BYTESMESSAGE_TYPE: - receiveJmsBytesMessage(m); - break; - case JMS_STREAMMESSAGE_TYPE: - receiveJmsStreamMessage(m); - break; - case JMS_TEXTMESSAGE_TYPE: - receiveJmsTextMessage(m); - break; - default:; - // TODO: handle error - no known JMS message type - } - - processMessageHeaders(m); - processMessageProperties(m); - - std::string subType(_subTypeList[_subTypeIndex]); - // Increment the subtype if the required number of messages have been received - if (_receivedSubTypeList.size() >= _testNumberMap[subType].asInt() && - _subTypeIndex < _testNumberMap.size()) { - _receivedValueMap[subType] = _receivedSubTypeList; - _receivedSubTypeList.clear(); - ++_subTypeIndex; - } - _received++; - if (_received >= _expected) { - d.receiver().close(); - d.connection().close(); - } - } - } catch (const std::exception&) { - d.receiver().close(); - d.connection().close(); - throw; - } - } - - void JmsReceiver::on_connection_error(proton::connection &c) { - std::cerr << "JmsReceiver::on_connection_error(): " << c.error() << std::endl; - } - - void JmsReceiver::on_receiver_error(proton::receiver& r) { - std::cerr << "JmsReceiver::on_receiver_error(): " << r.error() << std::endl; - } - - void JmsReceiver::on_session_error(proton::session &s) { - std::cerr << "JmsReceiver::on_session_error(): " << s.error() << std::endl; - } - - void JmsReceiver::on_transport_error(proton::transport &t) { - std::cerr << "JmsReceiver::on_transport_error(): " << t.error() << std::endl; - } - - void JmsReceiver::on_error(const proton::error_condition &ec) { - std::cerr << "JmsReceiver::on_error(): " << ec << std::endl; - } - - //static - uint32_t JmsReceiver::getTotalNumExpectedMsgs(const Json::Value testNumberMap) { - uint32_t total(0UL); - for (Json::Value::const_iterator i=testNumberMap.begin(); i!=testNumberMap.end(); ++i) { - total += (*i).asUInt(); - } - return total; - - } - - // protected - - void JmsReceiver::receiveJmsMessage(const proton::message& msg) { - _receivedSubTypeList.append(Json::Value()); - } - - void JmsReceiver::receiveJmsObjectMessage(const proton::message& msg) { - // TODO - } - - void JmsReceiver::receiveJmsMapMessage(const proton::message& msg) { - if(_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") != 0) { - throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_MAPMESSAGE_TYPE"); - } - std::string subType(_subTypeList[_subTypeIndex]); - std::map<std::string, proton::value> m; - msg.body().get(m); - for (std::map<std::string, proton::value>::const_iterator i=m.begin(); i!=m.end(); ++i) { - std::string key = i->first; - if (subType.compare(key.substr(0, key.size()-3)) != 0) { - throw qpidit::IncorrectJmsMapKeyPrefixError(subType, key); - } - proton::value val = i->second; - if (subType.compare("boolean") == 0) { - _receivedSubTypeList.append(val.get<bool>() ? Json::Value("True") : Json::Value("False")); - } else if (subType.compare("byte") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val.get<int8_t>()))); - } else if (subType.compare("bytes") == 0) { - _receivedSubTypeList.append(Json::Value(std::string(val.get<proton::binary>()))); - } else if (subType.compare("char") == 0) { - std::ostringstream oss; - oss << (char)val.get<wchar_t>(); - _receivedSubTypeList.append(Json::Value(oss.str())); - } else if (subType.compare("double") == 0) { - double d = val.get<double>(); - _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); - } else if (subType.compare("float") == 0) { - float f = val.get<float>(); - _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); - } else if (subType.compare("int") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val.get<int32_t>()))); - } else if (subType.compare("long") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val.get<int64_t>()))); - } else if (subType.compare("short") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val.get<int16_t>()))); - } else if (subType.compare("string") == 0) { - _receivedSubTypeList.append(Json::Value(val.get<std::string>())); - } else { - throw qpidit::UnknownJmsMessageSubTypeError(subType); - } - } - } - - void JmsReceiver::receiveJmsBytesMessage(const proton::message& msg) { - if(_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") != 0) { - throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_BYTESMESSAGE_TYPE"); - } - std::string subType(_subTypeList[_subTypeIndex]); - proton::binary body = msg.body().get<proton::binary>(); - if (subType.compare("boolean") == 0) { - if (body.size() != 1) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=boolean", 1, body.size()); - _receivedSubTypeList.append(body[0] ? Json::Value("True") : Json::Value("False")); - } else if (subType.compare("byte") == 0) { - if (body.size() != sizeof(int8_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=byte", sizeof(int8_t), body.size()); - int8_t val = *((int8_t*)body.data()); - _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val))); - } else if (subType.compare("bytes") == 0) { - _receivedSubTypeList.append(Json::Value(std::string(body))); - } else if (subType.compare("char") == 0) { - if (body.size() != sizeof(uint16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=char", sizeof(uint16_t), body.size()); - // TODO: This is ugly: ignoring first byte - handle UTF-16 correctly - char c = body[1]; - std::ostringstream oss; - oss << c; - _receivedSubTypeList.append(Json::Value(oss.str())); - } else if (subType.compare("double") == 0) { - if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=double", sizeof(int64_t), body.size()); - int64_t val = be64toh(*((int64_t*)body.data())); - _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val, true, false))); - } else if (subType.compare("float") == 0) { - if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=float", sizeof(int32_t), body.size()); - int32_t val = be32toh(*((int32_t*)body.data())); - _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val, true, false))); - } else if (subType.compare("long") == 0) { - if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=long", sizeof(int64_t), body.size()); - int64_t val = be64toh(*((int64_t*)body.data())); - _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val))); - } else if (subType.compare("int") == 0) { - if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=int", sizeof(int32_t), body.size()); - int32_t val = be32toh(*((int32_t*)body.data())); - _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val))); - } else if (subType.compare("short") == 0) { - if (body.size() != sizeof(int16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=short", sizeof(int16_t), body.size()); - int16_t val = be16toh(*((int16_t*)body.data())); - _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val))); - } else if (subType.compare("string") == 0) { - // TODO: decode string size in first two bytes and check string size - _receivedSubTypeList.append(Json::Value(std::string(body).substr(2))); - } else { - throw qpidit::UnknownJmsMessageSubTypeError(subType); - } - } - - void JmsReceiver::receiveJmsStreamMessage(const proton::message& msg) { - if(_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") != 0) { - throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_STREAMMESSAGE_TYPE"); - } - std::string subType(_subTypeList[_subTypeIndex]); - std::vector<proton::value> l; - msg.body().get(l); - for (std::vector<proton::value>::const_iterator i=l.begin(); i!=l.end(); ++i) { - if (subType.compare("boolean") == 0) { - _receivedSubTypeList.append(i->get<bool>() ? Json::Value("True") : Json::Value("False")); - } else if (subType.compare("byte") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(i->get<int8_t>()))); - } else if (subType.compare("bytes") == 0) { - _receivedSubTypeList.append(Json::Value(std::string(i->get<proton::binary>()))); - } else if (subType.compare("char") == 0) { - std::ostringstream oss; - oss << (char)i->get<wchar_t>(); - _receivedSubTypeList.append(Json::Value(oss.str())); - } else if (subType.compare("double") == 0) { - double d = i->get<double>(); - _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); - } else if (subType.compare("float") == 0) { - float f = i->get<float>(); - _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); - } else if (subType.compare("int") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(i->get<int32_t>()))); - } else if (subType.compare("long") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(i->get<int64_t>()))); - } else if (subType.compare("short") == 0) { - _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(i->get<int16_t>()))); - } else if (subType.compare("string") == 0) { - _receivedSubTypeList.append(Json::Value(i->get<std::string>())); - } else { - throw qpidit::UnknownJmsMessageSubTypeError(subType); - } - } - - } - - void JmsReceiver::receiveJmsTextMessage(const proton::message& msg) { - if(_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") != 0) { - throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_TEXTMESSAGE_TYPE"); - } - _receivedSubTypeList.append(Json::Value(msg.body().get<std::string>())); - } - - void JmsReceiver::processMessageHeaders(const proton::message& msg) { - addMessageHeaderString("JMS_TYPE_HEADER", msg.subject()); - if (_flagMap.isMember("JMS_CORRELATIONID_AS_BYTES") && _flagMap["JMS_CORRELATIONID_AS_BYTES"].asBool()) { - addMessageHeaderByteArray("JMS_CORRELATIONID_HEADER", proton::get<proton::binary>(msg.correlation_id())); - } else { - try { - addMessageHeaderString("JMS_CORRELATIONID_HEADER", proton::get<std::string>(msg.correlation_id())); - } catch (const std::exception& e) {} // TODO: UGLY, how do you check if there _is_ a correlation id? - } - - std::string reply_to = msg.reply_to(); - // Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present - if (_flagMap.isMember("JMS_REPLYTO_AS_TOPIC") && _flagMap["JMS_REPLYTO_AS_TOPIC"].asBool()) { - if (reply_to.find("topic://") == 0) { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to.substr(8)); - } else { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to); - } - } else { - if (reply_to.find("queue://") == 0) { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to.substr(8)); - } else { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to); - } - } - } - - void JmsReceiver::addMessageHeaderString(const char* headerName, const std::string& value) { - if (!value.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty strings are allowed in headers - Json::Value valueMap(Json::objectValue); - valueMap["string"] = value; - _receivedHeadersMap[headerName] = valueMap; - } - } - - void JmsReceiver::addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba) { - if (!ba.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty binaries are allowed in headers - Json::Value valueMap(Json::objectValue); - valueMap["bytes"] = std::string(ba); - _receivedHeadersMap[headerName] = valueMap; - } - } - - void JmsReceiver::addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d) { - if (!d.empty()) { - Json::Value valueMap(Json::objectValue); - switch (dt) { - case JMS_QUEUE: - valueMap["queue"] = d; - break; - case JMS_TOPIC: - valueMap["topic"] = d; - break; - default: - ; // TODO: Handle error: remaining JMS destinations not handled. - } - _receivedHeadersMap[headerName] = valueMap; - } - } - - void JmsReceiver::processMessageProperties(const proton::message& msg) { - // TODO: Add this function when PROTON-1284 is fixed -// std::map<proton::value, proton::value> props; -// msg.properties().value() >> props; - } - - //static - std::map<std::string, int8_t> JmsReceiver::initializeJmsMessageTypeAnnotationMap() { - std::map<std::string, int8_t> m; - m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE; - m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE; - m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE; - m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE; - m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE; - m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE; - return m; - } - - - } /* namespace shim */ -} /* namespace qpidit */ - -/* --- main --- - * Args: 1: Broker address (ip-addr:port) - * 2: Queue name - * 3: JMS message type - * 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap] - */ -int main(int argc, char** argv) { - /* - for (int i=0; i<argc; ++i) { - std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl; - } - */ - // TODO: improve arg management a little... - if (argc != 5) { - throw qpidit::ArgumentError("Incorrect number of arguments"); - } - - std::ostringstream oss; - oss << argv[1] << "/" << argv[2]; - - try { - Json::Value testParams; - Json::Reader jsonReader; - if (not jsonReader.parse(argv[4], testParams, false)) { - throw qpidit::JsonParserError(jsonReader); - } - - qpidit::shim::JmsReceiver receiver(oss.str(), argv[3], testParams[0], testParams[1]); - proton::default_container(receiver).run(); - - Json::FastWriter fw; - std::cout << argv[3] << std::endl; - std::cout << fw.write(receiver.getReceivedValueMap()); - std::cout << fw.write(receiver.getReceivedHeadersMap()); - std::cout << fw.write(receiver.getReceivedPropertiesMap()); - } catch (const std::exception& e) { - std::cout << "JmsReceiver error: " << e.what() << std::endl; - } -}
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp deleted file mode 100644 index 23c60fc..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsReceiver.hpp +++ /dev/null @@ -1,111 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_ -#define SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_ - -#include <iomanip> -#include <json/value.h> -#include "proton/messaging_handler.hpp" -#include "proton/types.hpp" -#include "qpidit/shim/JmsDefinitions.hpp" -#include <sstream> - -namespace qpidit -{ - namespace shim - { - - class JmsReceiver : public proton::messaging_handler - { - protected: - static proton::symbol s_jmsMessageTypeAnnotationKey; - static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues; - - const std::string _brokerUrl; - const std::string _jmsMessageType; - const Json::Value _testNumberMap; - const Json::Value _flagMap; - Json::Value::Members _subTypeList; - int _subTypeIndex; - uint32_t _expected; - uint32_t _received; - Json::Value _receivedSubTypeList; - Json::Value _receivedValueMap; - Json::Value _receivedHeadersMap; - Json::Value _receivedPropertiesMap; - public: - JmsReceiver(const std::string& brokerUrl, - const std::string& jmsMessageType, - const Json::Value& testNumberMap, - const Json::Value& flagMap); - virtual ~JmsReceiver(); - Json::Value& getReceivedValueMap(); - Json::Value& getReceivedHeadersMap(); - Json::Value& getReceivedPropertiesMap(); - void on_container_start(proton::container &c); - void on_message(proton::delivery &d, proton::message &m); - - void on_connection_error(proton::connection &c); - void on_receiver_error(proton::receiver& r); - void on_session_error(proton::session &s); - void on_transport_error(proton::transport &t); - void on_error(const proton::error_condition &c); - - static uint32_t getTotalNumExpectedMsgs(const Json::Value testNumberMap); - - protected: - void receiveJmsMessage(const proton::message& msg); - void receiveJmsObjectMessage(const proton::message& msg); - void receiveJmsMapMessage(const proton::message& msg); - void receiveJmsBytesMessage(const proton::message& msg); - void receiveJmsStreamMessage(const proton::message& msg); - void receiveJmsTextMessage(const proton::message& msg); - - void processMessageHeaders(const proton::message& msg); - void addMessageHeaderString(const char* headerName, const std::string& value); - void addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba); - void addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d); - void processMessageProperties(const proton::message& msg); - - static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap(); - - // Format signed numbers in negative hex format if signedFlag is true, ie -0xNNNN, positive numbers in 0xNNNN format - template<typename T> static std::string toHexStr(T val, bool fillFlag = false, bool signedFlag = true) { - std::ostringstream oss; - bool neg = false; - if (signedFlag) { - neg = val < 0; - if (neg) val = -val; - } - oss << (neg ? "-" : "") << "0x" << std::hex; - if (fillFlag) { - oss << std::setw(sizeof(T)*2) << std::setfill('0'); - } - oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val); - return oss.str(); - } - }; - - } /* namespace shim */ -} /* namespace qpidit */ - -#endif /* SRC_QPIDIT_SHIM_JMSRECEIVER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp deleted file mode 100644 index 6576732..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.cpp +++ /dev/null @@ -1,487 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpidit/shim/JmsSender.hpp" - -#include <cerrno> -#include <iomanip> -#include <iostream> -#include <json/json.h> -#include "proton/connection.hpp" -#include "proton/default_container.hpp" -#include "proton/tracker.hpp" -#include "proton/transport.hpp" -#include <stdio.h> - -namespace qpidit -{ - namespace shim - { - //static - proton::symbol JmsSender::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type"); - std::map<std::string, int8_t>JmsSender::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap(); - - JmsSender::JmsSender(const std::string& brokerUrl, - const std::string& jmsMessageType, - const Json::Value& testParams) : - _brokerUrl(brokerUrl), - _jmsMessageType(jmsMessageType), - _testValueMap(testParams[0]), - _testHeadersMap(testParams[1]), - _testPropertiesMap(testParams[2]), - _msgsSent(0), - _msgsConfirmed(0), - _totalMsgs(getTotalNumMessages(_testValueMap)) - { - if (_testValueMap.type() != Json::objectValue) { - throw qpidit::InvalidJsonRootNodeError(Json::objectValue, _testValueMap.type()); - } - } - - JmsSender::~JmsSender() {} - - void JmsSender::on_container_start(proton::container &c) { - c.open_sender(_brokerUrl); - } - - void JmsSender::on_sendable(proton::sender &s) { - if (_totalMsgs == 0) { - s.connection().close(); - } else if (_msgsSent == 0) { - Json::Value::Members subTypes = _testValueMap.getMemberNames(); - std::sort(subTypes.begin(), subTypes.end()); - for (std::vector<std::string>::const_iterator i=subTypes.begin(); i!=subTypes.end(); ++i) { - sendMessages(s, *i, _testValueMap[*i]); - } - } - } - - void JmsSender::on_tracker_accept(proton::tracker &t) { - _msgsConfirmed++; - if (_msgsConfirmed == _totalMsgs) { - t.connection().close(); - } - } - - void JmsSender::on_transport_close(proton::transport &t) { - _msgsSent = _msgsConfirmed; - } - - void JmsSender::on_connection_error(proton::connection &c) { - std::cerr << "JmsSender::on_connection_error(): " << c.error() << std::endl; - } - - void JmsSender::on_sender_error(proton::sender &s) { - std::cerr << "JmsSender::on_sender_error(): " << s.error() << std::endl; - } - - void JmsSender::on_session_error(proton::session &s) { - std::cerr << "JmsSender::on_session_error(): " << s.error() << std::endl; - } - - void JmsSender::on_transport_error(proton::transport &t) { - std::cerr << "JmsSender::on_transport_error(): " << t.error() << std::endl; - } - - void JmsSender::on_error(const proton::error_condition &ec) { - std::cerr << "JmsSender::on_error(): " << ec << std::endl; - } - - // protected - - void JmsSender::sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValues) { - uint32_t valueNumber = 0; - for (Json::Value::const_iterator i=testValues.begin(); i!=testValues.end(); ++i) { - if (s.credit()) { - proton::message msg; - if (_jmsMessageType.compare("JMS_MESSAGE_TYPE") == 0) { - setMessage(msg, subType, (*i).asString()); - } else if (_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") == 0) { - setBytesMessage(msg, subType, (*i).asString()); - } else if (_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") == 0) { - setMapMessage(msg, subType, (*i).asString(), valueNumber); - } else if (_jmsMessageType.compare("JMS_OBJECTMESSAGE_TYPE") == 0) { - setObjectMessage(msg, subType, *i); - } else if (_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") == 0) { - setStreamMessage(msg, subType, (*i).asString()); - } else if (_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") == 0) { - setTextMessage(msg, *i); - } else { - throw qpidit::UnknownJmsMessageTypeError(_jmsMessageType); - } - addMessageHeaders(msg); - addMessageProperties(msg); - s.send(msg); - _msgsSent += 1; - valueNumber += 1; - } - } - - } - - proton::message& JmsSender::setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { - if (subType.compare("none") != 0) { - throw qpidit::UnknownJmsMessageSubTypeError(subType); - } - if (testValueStr.size() != 0) { - throw InvalidTestValueError(subType, testValueStr); - } - msg.content_type(proton::symbol("application/octet-stream")); - msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MESSAGE_TYPE"]); - return msg; - } - - proton::message& JmsSender::setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { - proton::binary bin; - if (subType.compare("boolean") == 0) { - if (testValueStr.compare("False") == 0) bin.push_back(char(0)); - else if (testValueStr.compare("True") == 0) bin.push_back(char(1)); - else throw InvalidTestValueError(subType, testValueStr); - } else if (subType.compare("byte") == 0) { - uint8_t val = getIntegralValue<int8_t>(testValueStr); - bin.push_back(char(val)); - } else if (subType.compare("bytes") == 0) { - bin.assign(testValueStr.begin(), testValueStr.end()); - } else if (subType.compare("char") == 0) { - bin.push_back(char(0)); - if (testValueStr[0] == '\\') { // Format: '\xNN' - bin.push_back(getIntegralValue<char>(testValueStr.substr(2))); - } else { // Format: 'c' - bin.push_back(testValueStr[0]); - } - } else if (subType.compare("double") == 0) { - uint64_t val; - try { - val = htobe64(std::strtoul(testValueStr.data(), NULL, 16)); - } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("double", testValueStr); } - numToBinary(val, bin); - //for (int i=0; i<sizeof(val); ++i) { - // bin.push_back(* ((char*)&val + i)); - // } - } else if (subType.compare("float") == 0) { - uint32_t val; - try { - val = htobe32((uint32_t)std::strtoul(testValueStr.data(), NULL, 16)); - } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("float", testValueStr); } - numToBinary(val, bin); - //for (int i=0; i<sizeof(val); ++i) { - // bin.push_back(* ((char*)&val + i)); - //} - } else if (subType.compare("long") == 0) { - uint64_t val = htobe64(getIntegralValue<uint64_t>(testValueStr)); - numToBinary(val, bin); - //bin.assign(sizeof(val), val); - } else if (subType.compare("int") == 0) { - uint32_t val = htobe32(getIntegralValue<uint32_t>(testValueStr)); - numToBinary(val, bin); - //bin.assign(sizeof(val), val); - } else if (subType.compare("short") == 0) { - uint16_t val = htobe16(getIntegralValue<int16_t>(testValueStr)); - numToBinary(val, bin); - //bin.assign(sizeof(val), val); - } else if (subType.compare("string") == 0) { - std::ostringstream oss; - uint16_t strlen = htobe16((uint16_t)testValueStr.size()); - oss.write((char*)&strlen, sizeof(strlen)); - oss << testValueStr; - std::string os = oss.str(); - bin.assign(os.begin(), os.end()); - } else { - throw qpidit::UnknownJmsMessageSubTypeError(subType); - } - msg.body(bin); - msg.inferred(true); - msg.content_type(proton::symbol("application/octet-stream")); - msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_BYTESMESSAGE_TYPE"]); - return msg; - } - - proton::message& JmsSender::setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber) { - std::ostringstream oss; - oss << subType << std::setw(3) << std::setfill('0') << valueNumber; - std::string mapKey(oss.str()); - std::map<std::string, proton::value> m; - if (subType.compare("boolean") == 0) { - if (testValueStr.compare("False") == 0) m[mapKey] = false; - else if (testValueStr.compare("True") == 0) m[mapKey] = true; - else throw InvalidTestValueError(subType, testValueStr); - } else if (subType.compare("byte") == 0) { - m[mapKey] = int8_t(getIntegralValue<int8_t>(testValueStr)); - } else if (subType.compare("bytes") == 0) { - m[mapKey] = proton::binary(testValueStr); - } else if (subType.compare("char") == 0) { - wchar_t val; - if (testValueStr[0] == '\\') { // Format: '\xNN' - val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2)); - } else { // Format: 'c' - val = testValueStr[0]; - } - m[mapKey] = val; - } else if (subType.compare("double") == 0) { - m[mapKey] = getFloatValue<double, uint64_t>(testValueStr); - } else if (subType.compare("float") == 0) { - m[mapKey] = getFloatValue<float, uint32_t>(testValueStr); - } else if (subType.compare("int") == 0) { - m[mapKey] = getIntegralValue<int32_t>(testValueStr); - } else if (subType.compare("long") == 0) { - m[mapKey] = getIntegralValue<int64_t>(testValueStr); - } else if (subType.compare("short") == 0) { - m[mapKey] = getIntegralValue<int16_t>(testValueStr); - } else if (subType.compare("string") == 0) { - m[mapKey] = testValueStr; - } else { - throw qpidit::UnknownJmsMessageSubTypeError(subType); - } - msg.inferred(false); - msg.body(m); - msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MAPMESSAGE_TYPE"]); - return msg; - } - - proton::message& JmsSender::setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue) { - msg.body(getJavaObjectBinary(subType, testValue.asString())); - msg.inferred(true); - msg.content_type(proton::symbol("application/x-java-serialized-object")); - msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_OBJECTMESSAGE_TYPE"]); - return msg; - } - - proton::message& JmsSender::setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { - std::vector<proton::value> l; - if (subType.compare("boolean") == 0) { - if (testValueStr.compare("False") == 0) l.push_back(false); - else if (testValueStr.compare("True") == 0) l.push_back(true); - else throw InvalidTestValueError(subType, testValueStr); - } else if (subType.compare("byte") == 0) { - l.push_back(int8_t(getIntegralValue<int8_t>(testValueStr))); - } else if (subType.compare("bytes") == 0) { - l.push_back(proton::binary(testValueStr)); - } else if (subType.compare("char") == 0) { - wchar_t val; - if (testValueStr[0] == '\\') { // Format: '\xNN' - val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2)); - } else { // Format: 'c' - val = testValueStr[0]; - } - l.push_back(val); - } else if (subType.compare("double") == 0) { - l.push_back(getFloatValue<double, uint64_t>(testValueStr)); - } else if (subType.compare("float") == 0) { - l.push_back(getFloatValue<float, uint32_t>(testValueStr)); - } else if (subType.compare("int") == 0) { - l.push_back(getIntegralValue<int32_t>(testValueStr)); - } else if (subType.compare("long") == 0) { - l.push_back(getIntegralValue<int64_t>(testValueStr)); - } else if (subType.compare("short") == 0) { - l.push_back(getIntegralValue<int16_t>(testValueStr)); - } else if (subType.compare("string") == 0) { - l.push_back(testValueStr); - } else { - throw qpidit::UnknownJmsMessageSubTypeError(subType); - } - msg.body(l); - msg.inferred(true); - msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_STREAMMESSAGE_TYPE"]); - return msg; - } - - proton::message& JmsSender::setTextMessage(proton::message& msg, const Json::Value& testValue) { - msg.body(testValue.asString()); - msg.inferred(false); - msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_TEXTMESSAGE_TYPE"]); - return msg; - } - - proton::message& JmsSender::addMessageHeaders(proton::message& msg) { - Json::Value::Members headerNames = _testHeadersMap.getMemberNames(); - for (std::vector<std::string>::const_iterator i=headerNames.begin(); i!=headerNames.end(); ++i) { - const Json::Value _subMap = _testHeadersMap[*i]; - const std::string headerValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map - std::string val = _subMap[headerValueType].asString(); - if (i->compare("JMS_TYPE_HEADER") == 0) { - setJmsTypeHeader(msg, val); - } else if (i->compare("JMS_CORRELATIONID_HEADER") == 0) { - if (headerValueType.compare("bytes") == 0) { - setJmsCorrelationId(msg, proton::binary(val)); - } else { - setJmsCorrelationId(msg, val); - } - } else if (i->compare("JMS_REPLYTO_HEADER") == 0) { - setJmsReplyTo(msg, headerValueType, val); - } else { - throw qpidit::UnknownJmsHeaderTypeError(*i); - } - } - return msg; - } - - //static - proton::message& JmsSender::setJmsTypeHeader(proton::message& msg, const std::string& t) { - msg.subject(t); - return msg; - } - - //static - proton::message& JmsSender::setJmsCorrelationId(proton::message& msg, const std::string& cid) { - proton::message_id mid(cid); - msg.correlation_id(mid); - msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); - return msg; - } - - //static - proton::message& JmsSender::setJmsCorrelationId(proton::message& msg, const proton::binary cid) { - proton::message_id mid(cid); - msg.correlation_id(cid); - msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); - return msg; - } - - //static - proton::message& JmsSender::setJmsReplyTo(proton::message& msg, const std::string& dts, const std::string& d) { - if (dts.compare("queue") == 0) { - msg.reply_to(/*std::string("queue://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_QUEUE)); - } else if (dts.compare("temp_queue") == 0) { - msg.reply_to(/*std::string("queue://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TMEP_QUEUE)); - } else if (dts.compare("topic") == 0) { - msg.reply_to(/*std::string("topic://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TOPIC)); - } else if (dts.compare("temp_topic") == 0) { - msg.reply_to(/*std::string("topic://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TEMP_TOPIC)); - } else { - throw qpidit::UnknownJmsDestinationTypeError(dts); - } - return msg; - } - - proton::message& JmsSender::addMessageProperties(proton::message& msg) { - Json::Value::Members propertyNames = _testPropertiesMap.getMemberNames(); - for (std::vector<std::string>::const_iterator i=propertyNames.begin(); i!=propertyNames.end(); ++i) { - const Json::Value _subMap = _testPropertiesMap[*i]; - const std::string propertyValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map - std::string val = _subMap[propertyValueType].asString(); - if (propertyValueType.compare("boolean") == 0) { - if (val.compare("False") == 0) setMessageProperty(msg, *i, false); - else if (val.compare("True") == 0) setMessageProperty(msg, *i, true); - else throw InvalidTestValueError(propertyValueType, val); - } else if (propertyValueType.compare("byte") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int8_t>(val)); - } else if (propertyValueType.compare("double") == 0) { - setMessageProperty(msg, *i, getFloatValue<double, uint64_t>(val)); - } else if (propertyValueType.compare("float") == 0) { - setMessageProperty(msg, *i, getFloatValue<float, uint64_t>(val)); - } else if (propertyValueType.compare("int") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int32_t>(val)); - } else if (propertyValueType.compare("long") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int64_t>(val)); - } else if (propertyValueType.compare("short") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int16_t>(val)); - } else if (propertyValueType.compare("string") == 0) { - setMessageProperty(msg, *i, val); - } else { - throw qpidit::UnknownJmsPropertyTypeError(propertyValueType); - } - } - return msg; - } - - //static - proton::binary JmsSender::getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString) { - proton::binary javaObjectBinary; - char buf[1024]; - int bytesRead; - FILE* fp = ::popen("java -cp target/JavaObjUtils.jar org.apache.qpid.interop_test.obj_util.JavaObjToBytes javaClassStr", "rb"); - if (fp == NULL) { throw qpidit::PopenError(errno); } - do { - bytesRead = ::fread(buf, 1, sizeof(buf), fp); - javaObjectBinary.insert(javaObjectBinary.end(), &buf[0], &buf[bytesRead-1]); - } while (bytesRead == sizeof(buf)); - int status = ::pclose(fp); - if (status == -1) { - throw qpidit::PcloseError(errno); - } - return javaObjectBinary; - } - - // static - uint32_t JmsSender::getTotalNumMessages(const Json::Value& testValueMap) { - uint32_t tot = 0; - for (Json::Value::const_iterator i = testValueMap.begin(); i != testValueMap.end(); ++i) { - tot += (*i).size(); - } - return tot; - } - - //static - std::map<std::string, int8_t> JmsSender::initializeJmsMessageTypeAnnotationMap() { - std::map<std::string, int8_t> m; - m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE; - m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE; - m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE; - m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE; - m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE; - m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE; - return m; - } - - } /* namespace shim */ -} /* namespace qpidit */ - - - -/* - * --- main --- - * Args: 1: Broker address (ip-addr:port) - * 2: Queue name - * 3: AMQP type - * 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap] - */ - -int main(int argc, char** argv) { -/* - for (int i=0; i<argc; ++i) { - std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl; - } -*/ - // TODO: improve arg management a little... - if (argc != 5) { - throw qpidit::ArgumentError("Incorrect number of arguments"); - } - - std::ostringstream oss; - oss << argv[1] << "/" << argv[2]; - - try { - Json::Value testParams; - Json::Reader jsonReader; - if (not jsonReader.parse(argv[4], testParams, false)) { - throw qpidit::JsonParserError(jsonReader); - } - - qpidit::shim::JmsSender sender(oss.str(), argv[3], testParams); - proton::default_container(sender).run(); - } catch (const std::exception& e) { - std::cout << "JmsSender error: " << e.what() << std::endl; - } -} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp b/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp deleted file mode 100644 index ed3d57d..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/shim/JmsSender.hpp +++ /dev/null @@ -1,120 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef SRC_QPIDIT_SHIM_JMSSENDER_HPP_ -#define SRC_QPIDIT_SHIM_JMSSENDER_HPP_ - -#include "json/value.h" -#include "proton/message.hpp" -#include "proton/messaging_handler.hpp" -#include "qpidit/QpidItErrors.hpp" -#include "qpidit/shim/JmsDefinitions.hpp" -#include <typeinfo> - -namespace proton { - class message; -} - -namespace qpidit -{ - namespace shim - { - - class JmsSender : public proton::messaging_handler - { - protected: - static proton::symbol s_jmsMessageTypeAnnotationKey; - static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues; - - const std::string _brokerUrl; - const std::string _jmsMessageType; - const Json::Value _testValueMap; - const Json::Value _testHeadersMap; - const Json::Value _testPropertiesMap; - uint32_t _msgsSent; - uint32_t _msgsConfirmed; - uint32_t _totalMsgs; - public: - JmsSender(const std::string& brokerUrl, const std::string& jmsMessageType, const Json::Value& testParams); - virtual ~JmsSender(); - - void on_container_start(proton::container &c); - void on_sendable(proton::sender &s); - void on_tracker_accept(proton::tracker &t); - void on_transport_close(proton::transport &t); - - void on_connection_error(proton::connection &c); - void on_session_error(proton::session &s); - void on_sender_error(proton::sender& s); - void on_transport_error(proton::transport &t); - void on_error(const proton::error_condition &c); - protected: - void sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValueMap); - proton::message& setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr); - proton::message& setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr); - proton::message& setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber); - proton::message& setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue); - proton::message& setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValue); - proton::message& setTextMessage(proton::message& msg, const Json::Value& testValue); - - proton::message& addMessageHeaders(proton::message& msg); - static proton::message& setJmsTypeHeader(proton::message& msg, const std::string& t); - static proton::message& setJmsCorrelationId(proton::message& msg, const std::string& cid); - static proton::message& setJmsCorrelationId(proton::message& msg, const proton::binary cid); - static proton::message& setJmsReplyTo(proton::message& msg, const std::string& dt, const std::string& d); - - proton::message& addMessageProperties(proton::message& msg); - template<typename T> proton::message& setMessageProperty(proton::message& msg, const std::string& propertyName, T val) { - msg.properties().put(propertyName, val); - return msg; - } - - static proton::binary getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString); - static uint32_t getTotalNumMessages(const Json::Value& testValueMap); - - static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap(); - - template<typename T> static T numToBinary(T n, proton::binary& b) { - for (int i=0; i<sizeof(n); ++i) { - b.push_back(* ((char*)&n + i)); - } - } - - // Set message body to floating type T through integral type U - // Used to convert a hex string representation of a float or double to a float or double - template<typename T, typename U> T getFloatValue(const std::string& testValueStr) { - try { - U ival(std::strtoul(testValueStr.data(), NULL, 16)); - return T(*reinterpret_cast<T*>(&ival)); - } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); } - } - - template<typename T> T getIntegralValue(const std::string& testValueStr) { - try { - return T(std::strtol(testValueStr.data(), NULL, 16)); - } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); } - } - }; - - } /* namespace shim */ -} /* namespace qpidit */ - -#endif /* SRC_QPIDIT_SHIM_JMSSENDER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/83b89fe4/shims/qpid-proton-python/src/JmsReceiverShim.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/JmsReceiverShim.py b/shims/qpid-proton-python/src/JmsReceiverShim.py deleted file mode 100755 index 9140db1..0000000 --- a/shims/qpid-proton-python/src/JmsReceiverShim.py +++ /dev/null @@ -1,358 +0,0 @@ -#!/usr/bin/env python - -""" -JMS receiver shim for qpid-interop-test -""" - -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from interop_test_errors import InteropTestError -from json import dumps, loads -from proton import byte, symbol -from proton.handlers import MessagingHandler -from proton.reactor import Container -from struct import pack, unpack -from subprocess import check_output -from traceback import format_exc - -# These values must tie in with the Qpid-JMS client values found in -# org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport -QPID_JMS_TYPE_ANNOTATION_NAME = symbol(u'x-opt-jms-msg-type') - -class JmsReceiverShim(MessagingHandler): - """ - Receiver shim: This shim receives JMS messages sent by the Sender shim and prints the contents of the received - messages onto the terminal in JSON format for retrieval by the test harness. The JMS messages type and, where - applicable, body values, as well as the combinations of JMS headers and properties which may be attached to - the message are received on the command-line in JSON format when this program is launched. - """ - def __init__(self, url, jms_msg_type, test_parameters_list): - super(JmsReceiverShim, self).__init__() - self.url = url - self.jms_msg_type = jms_msg_type - self.expteced_msg_map = test_parameters_list[0] - self.flag_map = test_parameters_list[1] - self.subtype_itr = iter(sorted(self.expteced_msg_map.keys())) - self.expected = self._get_tot_num_messages() - self.received = 0 - self.received_value_map = {} - self.current_subtype = None - self.current_subtype_msg_list = None - self.jms_header_map = {} - self.jms_property_map = {} - - def get_received_value_map(self): - """"Return the collected message values received""" - return self.received_value_map - - def get_jms_header_map(self): - """Return the collected message headers received""" - return self.jms_header_map - - def get_jms_property_map(self): - """Return the collected message properties received""" - return self.jms_property_map - - def on_start(self, event): - """Event callback for when the client starts""" - event.container.create_receiver(self.url) - - def on_message(self, event): - """Event callback when a message is received by the client""" - if event.message.id and event.message.id < self.received: - return # ignore duplicate message - if self.expected == 0 or self.received < self.expected: - if self.current_subtype is None: - self.current_subtype = self.subtype_itr.next() - self.current_subtype_msg_list = [] - self.current_subtype_msg_list.append(self._handle_message(event.message)) - self._process_jms_headers(event.message) - self._process_jms_properties(event.message) - if len(self.current_subtype_msg_list) >= self.expteced_msg_map[self.current_subtype]: - self.received_value_map[self.current_subtype] = self.current_subtype_msg_list - self.current_subtype = None - self.current_subtype_msg_list = [] - self.received += 1 - if self.received == self.expected: - event.receiver.close() - event.connection.close() - - def on_connection_error(self, event): - print 'JmsReceiverShim.on_connection_error' - - def on_session_error(self, event): - print 'JmsReceiverShim.on_session_error' - - def on_link_error(self, event): - print 'JmsReceiverShim.on_link_error' - - def _handle_message(self, message): - """Handles the analysis of a received message""" - if self.jms_msg_type == 'JMS_MESSAGE_TYPE': - return self._receive_jms_message(message) - if self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE': - return self._receive_jms_bytesmessage(message) - if self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE': - return self._recieve_jms_mapmessage(message) - if self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE': - return self._recieve_jms_objectmessage(message) - if self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE': - return self._receive_jms_streammessage(message) - if self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE': - return self._receive_jms_textmessage(message) - print 'jms-receive: Unsupported JMS message type "%s"' % self.jms_msg_type - return None - - def _get_tot_num_messages(self): - """"Counts up the total number of messages which should be received from the expected message map""" - total = 0 - for key in self.expteced_msg_map: - total += int(self.expteced_msg_map[key]) - return total - - def _receive_jms_message(self, message): - """"Receives a JMS message (without a body)""" - assert self.jms_msg_type == 'JMS_MESSAGE_TYPE' - assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(0) - if message.body is not None: - raise InteropTestError('_receive_jms_message: Invalid body for type JMS_MESSAGE_TYPE: %s' % - str(message.body)) - return None - - def _receive_jms_bytesmessage(self, message): - """"Receives a JMS bytes message""" - assert self.jms_msg_type == 'JMS_BYTESMESSAGE_TYPE' - assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(3) - if self.current_subtype == 'boolean': - if message.body == b'\x00': - return 'False' - if message.body == b'\x01': - return 'True' - raise InteropTestError('_receive_jms_bytesmessage: Invalid encoding for subtype boolean: %s' % - str(message.body)) - if self.current_subtype == 'byte': - return hex(unpack('b', message.body)[0]) - if self.current_subtype == 'bytes': - return str(message.body) - if self.current_subtype == 'char': - if len(message.body) == 2: # format 'a' or '\xNN' - return str(message.body[1]) # strip leading '\x00' char - raise InteropTestError('Unexpected strring length for type char: %d' % len(message.body)) - if self.current_subtype == 'double': - return '0x%016x' % unpack('!Q', message.body)[0] - if self.current_subtype == 'float': - return '0x%08x' % unpack('!L', message.body)[0] - if self.current_subtype == 'int': - return hex(unpack('!i', message.body)[0]) - if self.current_subtype == 'long': - return hex(unpack('!q', message.body)[0]) - if self.current_subtype == 'short': - return hex(unpack('!h', message.body)[0]) - if self.current_subtype == 'string': - # NOTE: first 2 bytes are string length, must be present - if len(message.body) >= 2: - str_len = unpack('!H', message.body[:2])[0] - str_body = str(message.body[2:]) - if len(str_body) != str_len: - raise InteropTestError('String length mismatch: size=%d, but len(\'%s\')=%d' % - (str_len, str_body, len(str_body))) - return str_body - else: - raise InteropTestError('Malformed string binary: len(\'%s\')=%d' % - (repr(message.body), len(message.body))) - raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % - (self.jms_msg_type, self.current_subtype)) - - def _recieve_jms_mapmessage(self, message): - """"Receives a JMS map message""" - assert self.jms_msg_type == 'JMS_MAPMESSAGE_TYPE' - assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(2) - key, value = message.body.items()[0] - assert key[:-3] == self.current_subtype - if self.current_subtype == 'boolean': - return str(value) - if self.current_subtype == 'byte': - return hex(value) - if self.current_subtype == 'bytes': - return str(value) - if self.current_subtype == 'char': - return str(value) - if self.current_subtype == 'double': - return '0x%016x' % unpack('!Q', pack('!d', value))[0] - if self.current_subtype == 'float': - return '0x%08x' % unpack('!L', pack('!f', value))[0] - if self.current_subtype == 'int': - return hex(value) - if self.current_subtype == 'long': - return hex(int(value)) - if self.current_subtype == 'short': - return hex(value) - if self.current_subtype == 'string': - return str(value) - raise InteropTestError('JMS message type %s: Unknown or unsupported subtype \'%s\'' % - (self.jms_msg_type, self.current_subtype)) - - def _recieve_jms_objectmessage(self, message): - """"Receives a JMS Object message""" - assert self.jms_msg_type == 'JMS_OBJECTMESSAGE_TYPE' - assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(1) - return self._get_java_obj(message.body) - - def _get_java_obj(self, java_obj_bytes): - """ - Take bytes from serialized Java object and construct a Java object, then return its toString() value. The - work of 'translating' the bytes to a Java object and obtaining its class and value is done in a Java - utility org.apache.qpid.interop_test.obj_util.BytesToJavaObj located in jar JavaObjUtils.jar. - java_obj_bytes: hex string representation of bytes from Java object (eg 'aced00057372...') - returns: string containing Java class value as returned by the toString() method - """ - java_obj_bytes_str = ''.join(["%02x" % ord(x) for x in java_obj_bytes]).strip() - out_str = check_output(['java', - '-cp', - 'target/JavaObjUtils.jar', - 'org.apache.qpid.interop_test.obj_util.BytesToJavaObj', - java_obj_bytes_str]) - out_str_list = out_str.split('\n')[:-1] # remove trailing \n - if len(out_str_list) > 1: - raise InteropTestError('Unexpected return from JavaObjUtils: %s' % out_str) - colon_index = out_str_list[0].index(':') - if colon_index < 0: - raise InteropTestError('Unexpected format from JavaObjUtils: %s' % out_str) - java_class_name = out_str_list[0][:colon_index] - java_class_value_str = out_str_list[0][colon_index+1:] - if java_class_name != self.current_subtype: - raise InteropTestError('Unexpected class name from JavaObjUtils: expected %s, recieved %s' % - (self.current_subtype, java_class_name)) - return java_class_value_str - - def _receive_jms_streammessage(self, message): - """Receives a JMS stream message""" - assert self.jms_msg_type == 'JMS_STREAMMESSAGE_TYPE' - assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(4) - # Every message is a list with one item [value] - assert len(message.body) == 1 - value = message.body[0] - if self.current_subtype == 'boolean': - return str(value) - if self.current_subtype == 'byte': - return hex(value) - if self.current_subtype == 'bytes': - return str(value) - if self.current_subtype == 'char': - return str(value) - if self.current_subtype == 'double': - return '0x%016x' % unpack('!Q', pack('!d', value))[0] - if self.current_subtype == 'float': - return '0x%08x' % unpack('!L', pack('!f', value))[0] - if self.current_subtype == 'int': - return hex(value) - if self.current_subtype == 'long': - return hex(int(value)) - if self.current_subtype == 'short': - return hex(value) - if self.current_subtype == 'string': - return str(value) - raise InteropTestError('JmsRecieverShim._receive_jms_streammessage(): ' + - 'JMS message type %s: Unknown or unsupported subtype \'%s\'' % - (self.jms_msg_type, self.current_subtype)) - - def _receive_jms_textmessage(self, message): - """"Receives a JMS text message""" - assert self.jms_msg_type == 'JMS_TEXTMESSAGE_TYPE' - assert message.annotations[QPID_JMS_TYPE_ANNOTATION_NAME] == byte(5) - return message.body - - def _process_jms_headers(self, message): - """"Checks the supplied message for three JMS headers: message type, correlation-id and reply-to""" - # JMS message type header - message_type_header = message._get_subject() - if message_type_header is not None: - self.jms_header_map['JMS_TYPE_HEADER'] = {'string': message_type_header} - - # JMS correlation ID - correlation_id = message._get_correlation_id() - if correlation_id is not None: - if 'JMS_CORRELATIONID_AS_BYTES' in self.flag_map and self.flag_map['JMS_CORRELATIONID_AS_BYTES']: - self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'bytes': correlation_id} - else: - self.jms_header_map['JMS_CORRELATIONID_HEADER'] = {'string': correlation_id} - - # JMS reply-to - reply_to = message._get_reply_to() - if reply_to is not None: - if 'JMS_REPLYTO_AS_TOPIC' in self.flag_map and self.flag_map['JMS_REPLYTO_AS_TOPIC']: - # Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present - if len(reply_to) > 8 and reply_to[0:8] == 'topic://': - reply_to = reply_to[8:] - self.jms_header_map['JMS_REPLYTO_HEADER'] = {'topic': reply_to} - else: - if len(reply_to) > 8 and reply_to[0:8] == 'queue://': - reply_to = reply_to[8:] - self.jms_header_map['JMS_REPLYTO_HEADER'] = {'queue': reply_to} - - def _process_jms_properties(self, message): - """"Checks the supplied message for JMS message properties and decodes them""" - if message.properties is not None: - for jms_property_name in message.properties: - underscore_index = jms_property_name.find('_') - if underscore_index >= 0: # Ignore any other properties without '_' - jms_property_type = jms_property_name[0:underscore_index] - value = message.properties[jms_property_name] - if jms_property_type == 'boolean': - self.jms_property_map[jms_property_name] = {'boolean': str(value)} - elif jms_property_type == 'byte': - self.jms_property_map[jms_property_name] = {'byte': hex(value)} - elif jms_property_type == 'double': - self.jms_property_map[jms_property_name] = {'double': '0x%016x' % - unpack('!Q', pack('!d', value))[0]} - elif jms_property_type == 'float': - self.jms_property_map[jms_property_name] = {'float': '0x%08x' % - unpack('!L', pack('!f', value))[0]} - elif jms_property_type == 'int': - self.jms_property_map[jms_property_name] = {'int': hex(value)} - elif jms_property_type == 'long': - self.jms_property_map[jms_property_name] = {'long': hex(int(value))} - elif jms_property_type == 'short': - self.jms_property_map[jms_property_name] = {'short': hex(value)} - elif jms_property_type == 'string': - self.jms_property_map[jms_property_name] = {'string': str(value)} - else: - pass # Ignore any other properties, brokers can add them and we don't know what they may be - - -# --- main --- -# Args: 1: Broker address (ip-addr:port) -# 2: Queue name -# 3: JMS message type -# 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap] -#print '#### sys.argv=%s' % sys.argv -try: - RECEIVER = JmsReceiverShim('%s/%s' % (sys.argv[1], sys.argv[2]), sys.argv[3], loads(sys.argv[4])) - Container(RECEIVER).run() - print sys.argv[3] - print dumps(RECEIVER.get_received_value_map()) - print dumps(RECEIVER.get_jms_header_map()) - print dumps(RECEIVER.get_jms_property_map()) -except KeyboardInterrupt: - pass -except Exception as exc: - print 'jms-receiver-shim EXCEPTION:', exc - print format_exc() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
