Repository: qpid-interop-test Updated Branches: refs/heads/master f920bdf76 -> 8bf7bfe4c
QPIDIT-91: Add AmqpNetLite large_content shim. This commit requires late (v2.0.0 as yet unreleased) libraries: * a version of AMQP.Net Lite after 2017-06-15 to support RecieverLink.Receive TimeSpan interface. * a version of Amqp.Net Lite after 2017-07-21 in order to work with Qpid Dispatch Router (max-frame-size issue). Project: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/commit/63243b69 Tree: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/tree/63243b69 Diff: http://git-wip-us.apache.org/repos/asf/qpid-interop-test/diff/63243b69 Branch: refs/heads/master Commit: 63243b69c4f99707e1475823bfc092eeb15629cb Parents: f920bdf Author: Chuck Rolke <[email protected]> Authored: Thu Aug 31 16:23:54 2017 -0400 Committer: Chuck Rolke <[email protected]> Committed: Thu Aug 31 16:23:54 2017 -0400 ---------------------------------------------------------------------- shims/amqpnetlite/src/CMakeLists.txt | 92 ++- .../amqp_large_content_test/Receiver/App.config | 24 + .../Receiver/Properties/AssemblyInfo.cs | 57 ++ .../Receiver/Receiver.cs | 586 ++++++++++++++ .../Receiver/Receiver.csproj.in | 77 ++ .../Receiver/packages.config | 21 + .../amqp_large_content_test/Sender/App.config | 24 + .../Sender/Properties/AssemblyInfo.cs | 57 ++ .../amqp_large_content_test/Sender/Sender.cs | 782 +++++++++++++++++++ .../Sender/Sender.csproj.in | 76 ++ .../Sender/packages.config | 21 + .../amqp_large_content_test.py | 9 + 12 files changed, 1785 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/CMakeLists.txt b/shims/amqpnetlite/src/CMakeLists.txt index d096158..26bc2d5 100644 --- a/shims/amqpnetlite/src/CMakeLists.txt +++ b/shims/amqpnetlite/src/CMakeLists.txt @@ -76,77 +76,87 @@ option(BUILD_AMQPNETLITE "Build AMQP.Net Lite shim under mono" ${lite_default}) message(STATUS "BUILD_AMQPNETLITE = ${BUILD_AMQPNETLITE}") -# Stage the build -if (BUILD_AMQPNETLITE) - # Drop lite dll into build/packages - file(GLOB LITE_LIBS "${AMQPNETLITE_LIB_DIR}/Amqp.Net.*") - file(COPY ${LITE_LIBS} DESTINATION ${CMAKE_BINARY_DIR}/shims/amqpnetlite/packages/amqpnetlite/lib/net45/) - +# Configure/build/install a single test +MACRO (define_lite_test testname) # Configure the csproj files - configure_file( amqp_types_test/Receiver/Receiver.csproj.in - ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Receiver/Receiver.csproj + configure_file( ${testname}/Receiver/Receiver.csproj.in + ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Receiver/Receiver.csproj @ONLY) - configure_file( amqp_types_test/Sender/Sender.csproj.in - ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Sender/Sender.csproj + configure_file( ${testname}/Sender/Sender.csproj.in + ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Sender/Sender.csproj @ONLY) # Emit the custom build commands add_custom_target( - amqpnetlite_amqp_types_test_Sender + amqpnetlite_${testname}_Sender ALL COMMAND xbuild Sender.csproj > xbuild.log DEPENDS - ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Sender/Sender.csproj - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Sender/Sender.cs - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Sender/Properties/AssemblyInfo.cs - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Sender/App.config - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Sender/packages.config - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Sender - COMMENT Cross compiling amqpnetlite amqp_types_test/Sender + ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Sender/Sender.csproj + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Sender/Sender.cs + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Sender/Properties/AssemblyInfo.cs + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Sender/App.config + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Sender/packages.config + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Sender + COMMENT Cross compiling amqpnetlite ${testname}/Sender ) add_custom_target( - amqpnetlite_amqp_types_test_Receiver + amqpnetlite_${testname}_Receiver ALL COMMAND xbuild Receiver.csproj > xbuild.log DEPENDS - ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Receiver/Receiver.csproj - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Receiver/Receiver.cs - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Receiver/Properties/AssemblyInfo.cs - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Receiver/App.config - ${CMAKE_CURRENT_SOURCE_DIR}/amqp_types_test/Receiver/packages.config - WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Receiver - COMMENT Cross compiling amqpnetlite amqp_types_test/Receiver + ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Receiver/Receiver.csproj + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Receiver/Receiver.cs + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Receiver/Properties/AssemblyInfo.cs + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Receiver/App.config + ${CMAKE_CURRENT_SOURCE_DIR}/${testname}/Receiver/packages.config + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Receiver + COMMENT Cross compiling amqpnetlite ${testname}/Receiver ) # Install - set(LITE_INSTALL_ROOT ${CMAKE_INSTALL_PREFIX}/libexec/qpid_interop_test/shims/amqpnetlite) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Receiver/bin/Amqp.Net.dll - DESTINATION ${LITE_INSTALL_ROOT}/amqp_types_test/) + FILES ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Receiver/bin/Amqp.Net.dll + DESTINATION ${LITE_INSTALL_ROOT}/${testname}/) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Receiver/bin/Receiver.exe - DESTINATION ${LITE_INSTALL_ROOT}/amqp_types_test/) + FILES ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Receiver/bin/Receiver.exe + DESTINATION ${LITE_INSTALL_ROOT}/${testname}/) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Receiver/bin/Receiver.exe.config - DESTINATION ${LITE_INSTALL_ROOT}/amqp_types_test/) + FILES ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Receiver/bin/Receiver.exe.config + DESTINATION ${LITE_INSTALL_ROOT}/${testname}/) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Receiver/bin/Receiver.exe.mdb - DESTINATION ${LITE_INSTALL_ROOT}/amqp_types_test/) + FILES ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Receiver/bin/Receiver.exe.mdb + DESTINATION ${LITE_INSTALL_ROOT}/${testname}/) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Sender/bin/Sender.exe - DESTINATION ${LITE_INSTALL_ROOT}/amqp_types_test/) + FILES ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Sender/bin/Sender.exe + DESTINATION ${LITE_INSTALL_ROOT}/${testname}/) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Sender/bin/Sender.exe.config - DESTINATION ${LITE_INSTALL_ROOT}/amqp_types_test/) + FILES ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Sender/bin/Sender.exe.config + DESTINATION ${LITE_INSTALL_ROOT}/${testname}/) install( - FILES ${CMAKE_CURRENT_BINARY_DIR}/amqp_types_test/Sender/bin/Sender.exe.mdb - DESTINATION ${LITE_INSTALL_ROOT}/amqp_types_test/) + FILES ${CMAKE_CURRENT_BINARY_DIR}/${testname}/Sender/bin/Sender.exe.mdb + DESTINATION ${LITE_INSTALL_ROOT}/${testname}/) +ENDMACRO (define_lite_test) + +# Stage the build +if (BUILD_AMQPNETLITE) + # Drop lite dll into build/packages + file(GLOB LITE_LIBS "${AMQPNETLITE_LIB_DIR}/Amqp.Net.*") + file(COPY ${LITE_LIBS} DESTINATION ${CMAKE_BINARY_DIR}/shims/amqpnetlite/packages/amqpnetlite/lib/net45/) + + # define lite install root + set(LITE_INSTALL_ROOT ${CMAKE_INSTALL_PREFIX}/libexec/qpid_interop_test/shims/amqpnetlite) + + # define tests + define_lite_test (amqp_types_test) + define_lite_test (amqp_large_content_test) + endif () http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Receiver/App.config ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Receiver/App.config b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/App.config new file mode 100644 index 0000000..c60d743 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/App.config @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="utf-8" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<configuration> + <startup> + <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" /> + </startup> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Properties/AssemblyInfo.cs b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..aa2ca21 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Properties/AssemblyInfo.cs @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Receiver")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Receiver")] +[assembly: AssemblyCopyright("Copyright © 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("57860fb2-59c1-4302-b48c-982a82585116")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.cs ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.cs b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.cs new file mode 100644 index 0000000..eda4e9c --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.cs @@ -0,0 +1,586 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using System.Threading; +using Amqp; +using Amqp.Framing; +using Amqp.Types; +using System.Web.Script.Serialization; +//using Newtonsoft.Json; + +namespace Qpidit +{ + + /// <summary> + /// MessageValue holds a typed AMQP.Net Lite message body AmqpValue, + /// decodes it, and returns the QpidIT string that represents the message. + /// Complex values List and Map are constructed recursively. + /// Remaining singleton values like int or long are held directly as strings. + /// </summary> + class MessageValue + { + // Original message body object + private object messageValue; + + // Computed object type names + private string liteType; + private string qpiditType; + + // Singleton decoded value in QpidIt format + private string valueString; + + // Storage for List + private List<MessageValue> valueList; + + // Storage for Map + // Kept as lists to avoid dictionary reordering complications. + private List<MessageValue> valueMapKeys; + private List<MessageValue> valueMapValues; + + // Computed large_content object size + private int payloadSize; + private int payloadChunks; + + /// <summary> + /// Constructor. + /// User must call Decode to parse values. + /// </summary> + /// <param name="messageValue">Lite AmqpValue</param> + public MessageValue(object messageValue) + { + this.messageValue = messageValue; + liteType = ""; + qpiditType = ""; + valueString = null; + valueList = null; + valueMapKeys = null; + valueMapValues = null; + payloadSize = 0; // total bytes received + payloadChunks = 0; // count of elements in list or map + } + + /// <summary> + /// Lite type name property + /// </summary> + public string LiteTypeName { get { return liteType; } } + + /// <summary> + /// QpidIt type name property + /// </summary> + public string QpiditTypeName { get { return qpiditType; } } + + /// <summary> + /// Large_content test object size + /// </summary> + public int PayloadSize { get { return payloadSize; } } + + /// <summary> + /// Large_content test object number of parts + /// </summary> + public int PayloadChunks { get { return payloadChunks; } } + + /// <summary> + /// Decode the value and contained sub-values. + /// </summary> + /// <returns></returns> + public override string ToString() + { + if (valueString != null) + { + return "\"" + valueString + "\""; + } + else if (valueList != null) + { + StringBuilder sb = new StringBuilder(); + bool leading = false; + sb.Append("["); + foreach (var item in valueList) + { + if (leading) sb.Append(", "); + leading = true; + sb.Append(item.ToString()); + } + sb.Append("]"); + return sb.ToString(); + } + else if (valueMapKeys != null) + { + StringBuilder msb = new StringBuilder(); + msb.Append("{"); + for (int i = 0; i < valueMapKeys.Count; i++) + { + if (i > 0) msb.Append(", "); + msb.Append(valueMapKeys[i].ToString()); + msb.Append(":"); + msb.Append(valueMapValues[i].ToString()); + } + msb.Append("}"); + return msb.ToString(); + } + else + { + throw new ApplicationException("printing undecoded MessageValue"); + } + } + + /// <summary> + /// Compute contents of byte array in reverse order. + /// </summary> + /// <param name="input">The byte array.</param> + /// <param name="suppressLeading0s">Flag controls suppression of leading zeros.</param> + /// <returns>Hexadecimal string</returns> + public string BytesReversedToString(byte[] input, bool suppressLeading0s = false) + { + if (!BitConverter.IsLittleEndian) + { + throw new ApplicationException("This code requires a little endian host."); + } + string result = ""; + for (int i = input.Length - 1; i >= 0; i--) + { + result += String.Format("{0:x2}", input[i]); + } + if (suppressLeading0s) + { + result = result.TrimStart('0'); + } + if (String.IsNullOrEmpty(result)) + { + result = "0"; + } + return result; + } + + + /// <summary> + /// Json-encode an array of bytes assumed to be plain ascii text. + /// </summary> + /// <param name="theBytes">The byte array.</param> + /// <returns>Serialized string</returns> + public string SerializeBytes(byte[] theBytes) + { + StringBuilder builder = new StringBuilder(); + byte asciiSlash = Encoding.Default.GetBytes("\\")[0]; + byte asciiDoubleQuote = Encoding.Default.GetBytes("\"")[0]; + foreach (byte b in theBytes) + { + if (b == asciiSlash) { + // backslash needs backslash escape + builder.Append((char)asciiSlash); + builder.Append((char)asciiSlash); + } + else if (b == asciiDoubleQuote) { + // doublequote needs backslash escape + builder.Append((char)asciiSlash); + builder.Append((char)asciiDoubleQuote); + } + else if (b >= 32 && b <= 127) + // printable ascii passed through + builder.Append((char)b); + else + // non-printable ascii as escaped hex bytes + builder.Append(String.Format("\\x{0:x2}", b)); + } + return builder.ToString(); + } + + + /// <summary> + /// Json-encode an object whose string value is assumed to be plain ascii text. + /// </summary> + /// <param name="theObject">The object.</param> + /// <returns>Serialized string</returns> + public string SerializeString(object theObject) + { + byte[] bytes = Encoding.ASCII.GetBytes(theObject.ToString()); + return SerializeBytes(bytes); + } + + + /// <summary> + /// Decode message body's object type names and QpidIt display details. + /// Recursively process maps and lists. + /// </summary> + public void Decode() + { + if (messageValue == null) + { + liteType = "null"; + qpiditType = "null"; + valueString = "None"; + return; + } + + liteType = messageValue.GetType().Name; + if (liteType == "List") + { + qpiditType = "list"; + valueList = new List<MessageValue>(); + List list = (List)messageValue; + foreach (var item in list) + { + MessageValue itemValue = new Qpidit.MessageValue(item); + itemValue.Decode(); + valueList.Add(itemValue); + if (!itemValue.QpiditTypeName.Equals("string")) + { + throw new ApplicationException(String.Format( + "List content expects strings but received: {0}", itemValue.GetType().Name)); + } + payloadSize += itemValue.PayloadSize; + payloadChunks += 1; + } + } + else if (liteType == "Map") + { + qpiditType = "map"; + valueMapKeys = new List<MessageValue>(); + valueMapValues = new List<MessageValue>(); + Map map = (Map)messageValue; + foreach (var key in map.Keys) + { + MessageValue valueKey = new Qpidit.MessageValue(key); + valueKey.Decode(); + MessageValue valueValue = new Qpidit.MessageValue(map[key]); + valueValue.Decode(); + valueMapKeys.Add(valueKey); + valueMapValues.Add(valueValue); + payloadSize += valueValue.PayloadSize; + payloadChunks += 1; + } + } + else + { + //Console.WriteLine("MessageValue singleton type : {0}", liteType); + switch (liteType) + { + case "Boolean": + qpiditType = "boolean"; + valueString = (Boolean)messageValue ? "True" : "False"; + break; + case "Byte": + qpiditType = "ubyte"; + valueString = String.Format("0x{0:x}", (Byte)messageValue); + break; + case "UInt16": + qpiditType = "ushort"; + valueString = String.Format("0x{0:x}", (UInt16)messageValue); + break; + case "UInt32": + qpiditType = "uint"; + valueString = String.Format("0x{0:x}", (UInt32)messageValue); + break; + case "UInt64": + qpiditType = "ulong"; + valueString = String.Format("0x{0:x}", (UInt64)messageValue); + break; + case "SByte": + qpiditType = "byte"; + SByte mySbyte = (SByte)messageValue; + if (mySbyte >= 0) + { + valueString = String.Format("0x{0:x}", mySbyte); + } + else + { + mySbyte = (SByte) (-mySbyte); + valueString = String.Format("-0x{0:x}", mySbyte); + } + break; + case "Int16": + qpiditType = "short"; + Int16 myInt16 = (Int16)messageValue; + if (myInt16 >= 0) + { + valueString = String.Format("0x{0:x}", myInt16); + } + else + { + myInt16 = (Int16)(-myInt16); + valueString = String.Format("-0x{0:x}", myInt16); + } + break; + case "Int32": + qpiditType = "int"; + Int32 myInt32 = (Int32)messageValue; + if (myInt32 >= 0) + { + valueString = String.Format("0x{0:x}", myInt32); + } + else + { + myInt32 = (Int32)(-myInt32); + valueString = String.Format("-0x{0:x}", myInt32); + } + break; + case "Int64": + qpiditType = "long"; + Int64 myInt64 = (Int64)messageValue; + if (myInt64 >= 0) + { + valueString = String.Format("0x{0:x}", myInt64); + } + else + { + myInt64 = (Int64)(-myInt64); + valueString = String.Format("-0x{0:x}", myInt64); + } + break; + case "Single": + byte[] sbytes = BitConverter.GetBytes((Single)messageValue); + qpiditType = "float"; + valueString = "0x" + BytesReversedToString(sbytes); + break; + case "Double": + byte[] dbytes = BitConverter.GetBytes((Double)messageValue); + qpiditType = "double"; + valueString = "0x" + BytesReversedToString(dbytes); + break; + case "DateTime": + // epochTicks is the number of 100uSec ticks between 01/01/0001 + // and 01/01/1970. Used to adjust between DateTime and unix epoch. + const long epochTicks = 621355968000000000; + byte[] dtbytes = BitConverter.GetBytes( + (((DateTime)messageValue).Ticks - epochTicks) / TimeSpan.TicksPerMillisecond); + qpiditType = "timestamp"; + valueString = "0x" + BytesReversedToString(dtbytes, true); + break; + case "Guid": + qpiditType = "uuid"; + valueString = messageValue.ToString(); + break; + case "Byte[]": + qpiditType = "binary"; + byte[] binstr = (byte[])messageValue; + valueString = SerializeBytes(binstr); + payloadSize = binstr.Length; + break; + case "String": + qpiditType = "string"; + valueString = SerializeString(messageValue); + payloadSize = valueString.Length; + break; + case "Symbol": + qpiditType = "symbol"; + valueString = SerializeString(messageValue); + payloadSize = valueString.Length; + break; + default: + qpiditType = "unknown"; + valueString = String.Format("Unknown AMQP type: {0}", liteType); + throw new ApplicationException(valueString); + } + } + } + } + + /// <summary> + /// Receive and categorize some number of messages. + /// </summary> + class Receiver + { + private string brokerUrl; + private string queueName; + private string amqpType; + private Int32 nExpected; + private Int32 nReceived; + private List<Int32> receivedMByteSize; + private List<Int32> receivedChunks; + private Int32 mbFactor; + + public Receiver(string brokerUrl_, string queueName_, string amqpType_, Int32 expected_, Int32 mbFactor_) + { + brokerUrl = brokerUrl_; + queueName = queueName_; + amqpType = amqpType_; + nExpected = expected_; + nReceived = 0; + receivedMByteSize = new List<Int32>(); + receivedChunks = new List<Int32>(); + mbFactor = mbFactor_; + } + + ~Receiver() + { } + + public string ReceivedValueListSimple + { + get + { + StringBuilder sb = new StringBuilder(); + sb.Append("["); + for (int i = 0; i < receivedMByteSize.Count; i++) + { + if (i > 0) sb.Append(", "); + sb.Append(receivedMByteSize[i].ToString()); + } + sb.Append("]"); + return sb.ToString(); + } + } + + + public string ReceivedValueListComplex + { + get + { + StringBuilder sb = new StringBuilder(); + sb.Append("["); // start whole list + + int curMb = 0; + bool subsequentChunk = false; + for (int i = 0; i < receivedMByteSize.Count; i++) + { + // start new MB series? + if (receivedMByteSize[i] != curMb) + { + // start new series. Close old series? + if (curMb != 0) + { + sb.Append("]],"); + } + curMb = receivedMByteSize[i]; + sb.Append(String.Format("[{0},[", curMb.ToString())); + subsequentChunk = false; + } + // leading chunk gets no comma, subsequent chunks do + if (subsequentChunk) + { + sb.Append(","); + } + subsequentChunk = true; + // the chunk size + sb.Append(String.Format("{0}", receivedChunks[i].ToString())); + } + + sb.Append("]]]"); // end current chunk list, mb size list, and whole list + return sb.ToString(); + } + } + + + public string ReceivedValueList + { + get + { + return (String.Equals(amqpType, "list", StringComparison.OrdinalIgnoreCase) || + String.Equals(amqpType, "map", StringComparison.OrdinalIgnoreCase)) + ? ReceivedValueListComplex : ReceivedValueListSimple; + } + } + + + public void run() + { + ManualResetEvent receiverAttached = new ManualResetEvent(false); + OnAttached onReceiverAttached = (l, a) => { receiverAttached.Set(); }; + Address address = new Address(string.Format("amqp://{0}", brokerUrl)); + Connection connection = new Connection(address); + Session session = new Session(connection); + ReceiverLink receiverlink = new ReceiverLink(session, + "Lite-amqp-types-test-receiver", + new Source() { Address = queueName }, + onReceiverAttached); + if (receiverAttached.WaitOne(10000)) + { + while (nReceived < nExpected) + { + Message message = receiverlink.Receive(System.TimeSpan.FromSeconds(800)); + if (message != null) + { + nReceived += 1; + receiverlink.Accept(message); + MessageValue mv = new MessageValue(message.Body); + mv.Decode(); + + if (mv.QpiditTypeName != amqpType) + { + throw new ApplicationException(string.Format + ("Incorrect AMQP type found in message body: expected: {0}; found: {1}", + amqpType, mv.QpiditTypeName)); + } + receivedMByteSize.Add(mv.PayloadSize / mbFactor); + receivedChunks.Add(mv.PayloadChunks); + } + else + { + throw new ApplicationException(string.Format( + "Time out receiving message {0} of {1}", nReceived+1, nExpected)); + } + } + } + else + { + throw new ApplicationException(string.Format( + "Time out attaching to test broker {0} queue {1}", brokerUrl, queueName)); + } + + receiverlink.Close(); + session.Close(); + connection.Close(); + } + } + + class MainProgram + { + static int Main(string[] args) + { + /* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: QPIDIT AMQP type name of expected message body values + * 4: Expected number of test values to receive + */ + int exitCode = 0; + const Int32 mbFactor = 1024 * 1024; // command line specifies small(ish) numbers of megabytes. Adjust size of a megabyte here. + try + { + if (args.Length != 4) + { + throw new ApplicationException( + "program requires four arguments: brokerAddr queueName amqpType nValues"); + } + //Trace.TraceLevel = TraceLevel.Frame | TraceLevel.Verbose; + //Trace.TraceListener = (f, a) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, a)); + + Receiver receiver = new Qpidit.Receiver( + args[0], args[1], args[2], Int32.Parse(args[3]), mbFactor); + receiver.run(); + + Console.WriteLine(args[2]); + Console.WriteLine("{0}", receiver.ReceivedValueList); + } + catch (Exception e) + { + string firstline = new StringReader(e.ToString()).ReadLine(); + Console.Error.WriteLine("AmqpReceiver error: {0}.", firstline); + exitCode = 1; + } + + return exitCode; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.csproj.in ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.csproj.in b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.csproj.in new file mode 100644 index 0000000..94b60a4 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/Receiver.csproj.in @@ -0,0 +1,77 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{57860FB2-59C1-4302-B48C-982A82585116}</ProjectGuid> + <OutputType>Exe</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Receiver</RootNamespace> + <AssemblyName>Receiver</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>bin\</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="Amqp.Net, Culture=neutral, processorArchitecture=MSIL"> + <HintPath>..\..\..\packages\amqpnetlite\lib\net45\Amqp.Net.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Runtime.Serialization" /> + <Reference Include="System.Web" /> + <Reference Include="System.Web.Extensions" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Net.Http" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Receiver/Receiver.cs" /> + <Compile Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Receiver/Properties\AssemblyInfo.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Receiver/App.config" /> + <None Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Receiver/packages.config" /> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> +</Project> http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Receiver/packages.config ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Receiver/packages.config b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/packages.config new file mode 100644 index 0000000..7496c87 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Receiver/packages.config @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<packages> +</packages> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Sender/App.config ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Sender/App.config b/shims/amqpnetlite/src/amqp_large_content_test/Sender/App.config new file mode 100644 index 0000000..c60d743 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Sender/App.config @@ -0,0 +1,24 @@ +<?xml version="1.0" encoding="utf-8" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<configuration> + <startup> + <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.5" /> + </startup> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Sender/Properties/AssemblyInfo.cs ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Sender/Properties/AssemblyInfo.cs b/shims/amqpnetlite/src/amqp_large_content_test/Sender/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..bc5c247 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Sender/Properties/AssemblyInfo.cs @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Sender")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Sender")] +[assembly: AssemblyCopyright("Copyright © 2016")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("ab0b793a-c64c-4370-9f3e-75f80cdbdbd6")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.cs ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.cs b/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.cs new file mode 100644 index 0000000..8d0cbc7 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.cs @@ -0,0 +1,782 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +using System; +using System.Collections; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Web.Script.Serialization; +using Amqp; +using Amqp.Framing; +using Amqp.Types; + +// Large_content_test is driven by a list like this: +// +// TYPE_MAP = { +// # List of sizes in Mb +// 'binary': [1, 10, 100], +// 'string': [1, 10, 100], +// 'symbol': [1, 10, 100], +// # Tuple of two elements: (tot size of list/map in MB, List of no elements in list) +// # The num elements lists are powers of 2 so that they divide evenly into the size in MB (1024 * 1024 bytes) +// 'list': [[1, [1, 16, 256, 4096]], [10, [1, 16, 256, 4096]], [100, [1, 16, 256, 4096]]], +// 'map': [[1, [1, 16, 256, 4096]], [10, [1, 16, 256, 4096]], [100, [1, 16, 256, 4096]]], +// #'array': [[1, [1, 16, 256, 4096]], [10, [1, 16, 256, 4096]], [100, [1, 16, 256, 4096]]] +// } +// +// Sender receives command line args from this map: +// Sender host queuename binary '[1, 10, 100]' +// Sender host queuename list '[[1, [1, 16, 256, 4096]], [10, [1, 16, 256, 4096]], [100, [1, 16, 256, 4096]]]' +// +// Sender +// 1) generates actual .NET objects as specified +// 2) uses the AMQP Lite client to generate messages holding the object +// 3) sends the message(s) to the host/queuename. +// +namespace Qpidit +{ + /// <summary> + /// MessageValue holds a QpidIT type name and a json object created from the + /// CLI string argument, encodes it, and returns the object to be used as the + /// constructor for a message to be sent. + /// Complex values List and Map are constructed recursively. + /// Remaining singleton values like int or long are held directly as objects. + /// </summary> + class MessageValue + { + // Original type and json object + private string baseType; + private object baseValue; + + // has Encode been called? + private Boolean encoded; + + // Simple objects completely encoded + // List and Map defined recursively + private object valueDirect; + + /// <summary> + /// Constructor + /// </summary> + /// <param name="type">qpidit type name</param> + /// <param name="value">json encoded object</param> + public MessageValue(string type, object value) + { + baseType = type; + baseValue = value; + encoded = false; + valueDirect = null; + } + + + /// <summary> + /// Create a MessageValue without knowing beforehand what + /// type of system object is being handled. + /// * Objects created in the list from the command line have a + /// known type and are created via the constructor. + /// * Objects created inside List and Map have only string + /// type externally but need an actual type to be figured out. + /// </summary> + /// <param name="obj"></param> + /// <returns></returns> + public static MessageValue CreateAutoType(object obj) + { + return new Qpidit.MessageValue(QpiditTypeOf(obj), obj); + } + + + /// <summary> + /// Return the native object that represents the encoded + /// Amqp value. + /// </summary> + /// <returns></returns> + public Object ToObject() + { + if (!encoded) + Encode(); + + return valueDirect; + } + + + /// <summary> + /// Return the Amqp Message that holds this value. + /// </summary> + /// <returns></returns> + public Message ToMessage() + { + return new Message(this.ToObject()); + } + + + /// <summary> + /// Get an object's QPIDIT type + /// </summary> + /// <param name="obj">a .NET object</param> + /// <returns>QpidIT type name</returns> + public static string QpiditTypeOf(object obj) + { + string typename = obj.GetType().Name; + string qpiditType = null; + if (obj == null) + { + qpiditType = "null"; + } + else + { + switch (typename) + { + case "Boolean": + qpiditType = "boolean"; + break; + case "Byte": + qpiditType = "ubyte"; + break; + case "UInt16": + qpiditType = "ushort"; + break; + case "UInt32": + qpiditType = "uint"; + break; + case "UInt64": + qpiditType = "ulong"; + break; + case "SByte": + qpiditType = "byte"; + break; + case "Int16": + qpiditType = "short"; + break; + case "Int32": + qpiditType = "int"; + break; + case "Int64": + qpiditType = "long"; + break; + case "Single": + qpiditType = "float"; + break; + case "Double": + qpiditType = "double"; + break; + case "DateTime": + qpiditType = "timestamp"; + break; + case "Guid": + qpiditType = "uuid"; + break; + case "Byte[]": + qpiditType = "binary"; + break; + case "String": + qpiditType = "string"; + break; + case "Symbol": + qpiditType = "symbol"; + break; + case "Array": + qpiditType = "list"; + break; + case "ArrayList": + qpiditType = "list"; + break; + case "Dictionary": + qpiditType = "map"; + break; + case "Dictionary`2": + qpiditType = "map"; + break; + default: + throw new ApplicationException(String.Format( + "Can not translate system type {0} to a QpidIT type", typename)); + } + } + return qpiditType; + } + + + public string StripLeading0x(string value) + { + if (!value.StartsWith("0x")) + throw new ApplicationException(String.Format( + "EncodeUInt string does not start with '0x' : {0}", value)); + return value.Substring(2); + } + + public UInt64 EncodeUInt(string value) + { + UInt64 result = 0; + value = StripLeading0x(value); + result = UInt64.Parse(value, System.Globalization.NumberStyles.HexNumber); + return result; + } + + + public Int64 EncodeInt(string value) + { + Int64 result = 0; + bool isNegated = value.StartsWith("-"); + if (isNegated) + value = value.Substring(1); + value = StripLeading0x(value); + result = Int64.Parse(value, System.Globalization.NumberStyles.HexNumber); + if (isNegated) + result = -result; + return result; + } + + + /// <summary> + /// Generate the object used to create a message + /// </summary> + public void Encode() + { + if (encoded) + return; + + if (baseValue is Array) + { + // List + if (! String.Equals(baseType, "list", StringComparison.OrdinalIgnoreCase)) + { + throw new ApplicationException(String.Format( + "Sender asked to encode a {0} but received a list: {1}", baseType, baseValue.ToString())); + } + valueDirect = new Amqp.Types.List(); + foreach (object item in (Array)baseValue) + { + MessageValue itemValue = MessageValue.CreateAutoType(item); + itemValue.Encode(); + ((Amqp.Types.List)valueDirect).Add(itemValue.ToObject()); + } + } + else if (baseValue is ArrayList) + { + // List + if (! String.Equals(baseType, "list", StringComparison.OrdinalIgnoreCase)) + { + throw new ApplicationException(String.Format( + "Sender asked to encode a {0} but received a list: {1}", baseType, baseValue.ToString())); + } + valueDirect = new Amqp.Types.List(); + foreach (object item in (ArrayList)baseValue) + { + MessageValue itemValue = MessageValue.CreateAutoType(item); + itemValue.Encode(); + ((Amqp.Types.List)valueDirect).Add(itemValue.ToObject()); + } + } + else if (baseValue is Dictionary<string, object>) + { + // Map + if (!String.Equals(baseType, "map", StringComparison.OrdinalIgnoreCase)) + { + throw new ApplicationException(String.Format( + "Sender asked to encode a {0} but received a map: {1}", baseType, baseValue.ToString())); + } + valueDirect = new Amqp.Types.Map(); + Dictionary<string, object> myDict = new Dictionary<string, object>(); + myDict = (Dictionary<string, object>)baseValue; + foreach (var key in myDict.Keys) + { + MessageValue itemValue = MessageValue.CreateAutoType(myDict[key]); + ((Amqp.Types.Map)valueDirect)[key] = itemValue.ToObject(); + } + } + else if (baseValue is String) + { + string value; + UInt64 valueUInt64; + Int64 valueInt64; + + switch (baseType) + { + case "null": + valueDirect = null; + break; + case "boolean": + value = (string)baseValue; + bool mybool = String.Equals(value, "true", StringComparison.OrdinalIgnoreCase); + valueDirect = mybool; + break; + case "ubyte": + value = (string)baseValue; + valueUInt64 = EncodeUInt(value); + Byte mybyte = (Byte)(valueUInt64 & 0xff); + valueDirect = mybyte; + break; + case "ushort": + value = (string)baseValue; + valueUInt64 = EncodeUInt(value); + UInt16 myuint16 = (UInt16)(valueUInt64 & 0xffff); + valueDirect = myuint16; + break; + case "uint": + value = (string)baseValue; + valueUInt64 = EncodeUInt(value); + UInt32 myuint32 = (UInt32)(valueUInt64 & 0xffffffff); + valueDirect = myuint32; + break; + case "ulong": + value = (string)baseValue; + valueUInt64 = EncodeUInt(value); + valueDirect = valueUInt64; + break; + case "byte": + value = (string)baseValue; + valueInt64 = EncodeInt(value); + SByte mysbyte = (SByte)(valueInt64 & 0xff); + valueDirect = mysbyte; + break; + case "short": + value = (string)baseValue; + valueInt64 = EncodeInt(value); + Int16 myint16 = (Int16)(valueInt64 & 0xffff); + valueDirect = myint16; + break; + case "int": + value = (string)baseValue; + valueInt64 = EncodeInt(value); + Int32 myint32 = (Int32)(valueInt64 & 0xffffffff); + valueDirect = myint32; + break; + case "long": + value = (string)baseValue; + valueInt64 = EncodeInt(value); + valueDirect = valueInt64; + break; + case "float": + value = StripLeading0x((string)baseValue); + UInt32 num32 = UInt32.Parse(value, System.Globalization.NumberStyles.AllowHexSpecifier); + byte[] floatVals = BitConverter.GetBytes(num32); + float flt = BitConverter.ToSingle(floatVals, 0); + valueDirect = flt; + break; + case "double": + value = StripLeading0x((string)baseValue); + UInt64 num64 = UInt64.Parse(value, System.Globalization.NumberStyles.AllowHexSpecifier); + byte[] doubleVals = BitConverter.GetBytes(num64); + double dbl = BitConverter.ToDouble(doubleVals, 0); + valueDirect = dbl; + break; + case "timestamp": + // epochTicks is the number of 100uSec ticks between 01/01/0001 + // and 01/01/1970. Used to adjust between DateTime and unix epoch. + const long epochTicks = 621355968000000000; + value = StripLeading0x((string)baseValue); + Int64 dtticks = Int64.Parse(value, System.Globalization.NumberStyles.AllowHexSpecifier); + dtticks *= TimeSpan.TicksPerMillisecond; + dtticks += epochTicks; + DateTime dt = new DateTime(dtticks, DateTimeKind.Utc); + valueDirect = dt; + break; + case "uuid": + value = (string)baseValue; + Guid guid = new Guid(value); + valueDirect = guid; + break; + case "binary": + // TODO: fix this + value = (string)baseValue; + byte[] binval = Encoding.ASCII.GetBytes(value); + valueDirect = binval; + break; + case "string": + valueDirect = (string)baseValue; + break; + case "symbol": + Symbol sym = new Symbol((string)baseValue); + valueDirect = sym; + break; + case "list": + throw new ApplicationException(String.Format( + "Sender asked to encode a list but received a string: {0}", baseValue)); + case "map": + throw new ApplicationException(String.Format( + "Sender asked to encode a map but received a string: {0}", baseValue)); + case "decimal32": + case "decimal64": + case "decimal128": + throw new ApplicationException(String.Format( + "AMQP.Net Lite does not support AMQP decimal type: {0}", baseType)); + default: + throw new ApplicationException(String.Format( + "Sender can not encode base type: {0}", baseType)); + } + } + else + { + throw new ApplicationException(String.Format( + "Sender can not encode object type {0}", baseValue.GetType().Name)); + } + encoded = true; + } + } + + + /// <summary> + /// Classes to parse JSON size arg for map and list types + /// </summary> + class MbBlock + { + public Int32 mBytes { get; set; } + public List<Int32> nChunks { get; set; } + } + class MbSpec + { + public List<MbBlock> MbBlocks { get; set; } + + enum ParseState + { + START, + NEW_MBBLOCK, + M_BYTES, + NEW_NCHUNKS, + NCHUNK, + END_NCHUNK, + END_MBBLOCK, + END + } + + public MbSpec(string sizeSpec) + { + MbBlock curMbBlock = new MbBlock(); + Int32 curInt = 0; ; + + ParseState ps = ParseState.START; + + int pos = -1; + foreach (char ch in sizeSpec) + { + pos += 1; + //Console.WriteLine("ch = '{0}', position = {1}", ch, pos); + string err = ""; + switch (ps) + { + case ParseState.START: + if (ch == '[') + { + MbBlocks = new List<MbBlock>(); + ps = ParseState.NEW_MBBLOCK; + } + else + err = "no leading '['"; + break; + case ParseState.NEW_MBBLOCK: + if (ch == ']') + { + MbBlocks.Add(curMbBlock); + ps = ParseState.END; + } + else if (ch == '[') + { + curMbBlock = new MbBlock(); + ps = ParseState.M_BYTES; + curInt = 0; + } + else if (Char.IsWhiteSpace(ch)) + { + // ignore whitespace + } + else + err = "NEW_MBBLOCK expects '[' or ']'"; + break; + case ParseState.M_BYTES: + if (Char.IsDigit(ch)) + { + curInt *= 10; + curInt += (int)(ch - '0'); + } + else if (ch == ',') + { + curMbBlock.mBytes = curInt; + curInt = 0; + ps = ParseState.NEW_NCHUNKS; + } + else if (Char.IsWhiteSpace(ch)) + { + // ignore whitespace + } + else + err = "M_BYTES expects digit or ','"; + break; + case ParseState.NEW_NCHUNKS: + if (ch == '[') + { + curMbBlock.nChunks = new List<Int32>(); + ps = ParseState.NCHUNK; + } + else if (Char.IsWhiteSpace(ch)) + { + // ignore whitespace + } + else + err = "NEW_NCHUNKS expects digit or '['"; + break; + case ParseState.NCHUNK: + if (Char.IsDigit(ch)) + { + curInt *= 10; + curInt += (int)(ch - '0'); + } + else if (ch == ',') + { + curMbBlock.nChunks.Add(curInt); + curInt = 0; + } + else if (Char.IsWhiteSpace(ch)) + { + // ignore whitespace + } + else if (ch == ']') + { + curMbBlock.nChunks.Add(curInt); + curInt = 0; + ps = ParseState.END_NCHUNK; + } + else + err = "NCHUNK expects digit, ',', or ']'"; + break; + case ParseState.END_NCHUNK: + if (Char.IsWhiteSpace(ch)) + { + // ignore whitespace + } + else if (ch == ']') + { + MbBlocks.Add(curMbBlock); + curMbBlock = new MbBlock(); + ps = ParseState.END_MBBLOCK; + } + else + err = "END_MBBLOCK expects ']'"; + break; + case ParseState.END_MBBLOCK: + if (Char.IsWhiteSpace(ch)) + { + // ignore whitespace + } + else if (ch == ',') + { + ps = ParseState.NEW_MBBLOCK; + } + else if (ch == ']') + { + ps = ParseState.END; + } + else + err = "END_MBBLOCK expects ',', ']', or '['"; + break; + case ParseState.END: + err = "illegal characters after JSON end of object"; + break; + } + + if (err != "") + { + throw new ApplicationException(String.Format("Size spec parse error at position {0}, state {1}: {2}", pos, ps, err)); + } + } + } + } + + + + class Sender + { + private string brokerUrl; + private string queueName; + private string amqpType; + private string countSpec; + private Int32 mbFactor; + private MbSpec mbSpec; + + public Sender(string brokerUrl_, string queueName_, string amqpType_, string countSpec_, Int32 mbFactor_) + { + brokerUrl = brokerUrl_; + queueName = queueName_; + amqpType = amqpType_; + countSpec = countSpec_; + mbFactor = mbFactor_; + } + + ~Sender() + { } + + + public IEnumerable<Message> AllMessages() + { + if (String.Equals(amqpType, "binary", StringComparison.OrdinalIgnoreCase) || + String.Equals(amqpType, "string", StringComparison.OrdinalIgnoreCase) || + String.Equals(amqpType, "symbol", StringComparison.OrdinalIgnoreCase)) + { + // Deserialize the count spec list + JavaScriptSerializer csl = new JavaScriptSerializer(); + var itMsgs = csl.Deserialize<dynamic>(countSpec); + //if (!(itMsgs is Array)) + // throw new ApplicationException(String.Format( + // "Messages are not formatted as a json list: {0}, but as type: {1}", countSpec, itMsgs.GetType().Name)); + + // Generate messages + if (String.Equals(amqpType, "binary", StringComparison.OrdinalIgnoreCase)) + { + foreach (Int32 mbSize in itMsgs) + { + string binStr = new string(Convert.ToChar(0), mbSize * mbFactor); + MessageValue mv = new MessageValue(amqpType, binStr); + yield return mv.ToMessage(); + } + } + else if (String.Equals(amqpType, "string", StringComparison.OrdinalIgnoreCase)) + { + foreach (Int32 mbSize in itMsgs) + { + string binStr = new string('s', mbSize * mbFactor); + MessageValue mv = new MessageValue(amqpType, binStr); + yield return mv.ToMessage(); + } + } + else if (String.Equals(amqpType, "symbol", StringComparison.OrdinalIgnoreCase)) + { + foreach (Int32 mbSize in itMsgs) + { + string binStr = new string('b', mbSize * mbFactor); + MessageValue mv = new MessageValue(amqpType, binStr); + yield return mv.ToMessage(); + } + } + } + else if (String.Equals(amqpType, "list", StringComparison.OrdinalIgnoreCase)) + { + mbSpec = new MbSpec(countSpec); + foreach (MbBlock mbBlock in mbSpec.MbBlocks) + { + foreach (Int32 eCount in mbBlock.nChunks) + { + Int32 sizePerEltBytes = (mbBlock.mBytes * mbFactor) / eCount; + string[] testList = new string[eCount]; + for (int i = 0; i < eCount; i++) + { + testList[i] = new string('L', sizePerEltBytes); + } + MessageValue mv = new MessageValue(amqpType, testList); + yield return mv.ToMessage(); + } + } + } + else if (String.Equals(amqpType, "map", StringComparison.OrdinalIgnoreCase)) + { + mbSpec = new MbSpec(countSpec); + foreach (MbBlock mbBlock in mbSpec.MbBlocks) + { + foreach (Int32 eCount in mbBlock.nChunks) + { + Int32 sizePerEltBytes = (mbBlock.mBytes * mbFactor) / eCount; + Dictionary<string, object> testMap = new Dictionary<string, object>(); + for (int i = 0; i < eCount; i++) + { + testMap[i.ToString("000000")] = new string('M', sizePerEltBytes); + } + MessageValue mv = new MessageValue(amqpType, testMap); + yield return mv.ToMessage(); + } + } + } + // else if (String.Equals(amqpType, "array", StringComparison.OrdinalIgnoreCase)) + //{ /* TODO */ } + else + { + throw new ApplicationException(String.Format( + "unsupported amqp type: {0}", amqpType)); + } + } + + public void run() + { + // Send the messages + ManualResetEvent senderAttached = new ManualResetEvent(false); + OnAttached onSenderAttached = (l, a) => { senderAttached.Set(); }; + Address address = new Address(string.Format("amqp://{0}", brokerUrl)); + Connection connection = new Connection(address); + Session session = new Session(connection); + SenderLink sender = new SenderLink(session, + "Lite-amqp-large-content-test-sender", + new Target() { Address = queueName }, + onSenderAttached); + if (senderAttached.WaitOne(10000)) + { + foreach (Message message in AllMessages()) + { + sender.Send(message); + } + } + else + { + throw new ApplicationException(string.Format( + "Time out attaching to test broker {0} queue {1}", brokerUrl, queueName)); + } + + sender.Close(); + session.Close(); + connection.Close(); + } + } + + class MainProgram + { + static int Main(string[] args) + { + /* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: AMQP type + * 4: Test value(s) as JSON string + */ + int exitCode = 0; + const Int32 mbFactor = 1024 * 1024; // command line specifies small(ish) numbers of megabytes. Adjust size of a megabyte here. + try + { + if (args.Length != 4) + { + throw new ApplicationException( + "program requires four arguments: brokerAddr queueName amqpType jsonValuesToSend"); + } + //Trace.TraceLevel = TraceLevel.Frame | TraceLevel.Verbose; + //Trace.TraceListener = (f, a) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, a)); + + Sender sender = new Qpidit.Sender(args[0], args[1], args[2], args[3], mbFactor); + sender.run(); + } + catch (Exception e) + { + string firstline = new StringReader(e.ToString()).ReadLine(); + Console.Error.WriteLine("AmqpSender error: {0}.", firstline); + exitCode = 1; + } + + return exitCode; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.csproj.in ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.csproj.in b/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.csproj.in new file mode 100644 index 0000000..ca59361 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Sender/Sender.csproj.in @@ -0,0 +1,76 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<Project ToolsVersion="14.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> + <Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" /> + <PropertyGroup> + <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration> + <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform> + <ProjectGuid>{AB0B793A-C64C-4370-9F3E-75F80CDBDBD6}</ProjectGuid> + <OutputType>Exe</OutputType> + <AppDesignerFolder>Properties</AppDesignerFolder> + <RootNamespace>Sender</RootNamespace> + <AssemblyName>Sender</AssemblyName> + <TargetFrameworkVersion>v4.5</TargetFrameworkVersion> + <FileAlignment>512</FileAlignment> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugSymbols>true</DebugSymbols> + <DebugType>full</DebugType> + <Optimize>false</Optimize> + <OutputPath>bin\</OutputPath> + <DefineConstants>DEBUG;TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' "> + <PlatformTarget>AnyCPU</PlatformTarget> + <DebugType>pdbonly</DebugType> + <Optimize>true</Optimize> + <OutputPath>bin\</OutputPath> + <DefineConstants>TRACE</DefineConstants> + <ErrorReport>prompt</ErrorReport> + <WarningLevel>4</WarningLevel> + </PropertyGroup> + <ItemGroup> + <Reference Include="Amqp.Net, Culture=neutral, processorArchitecture=MSIL"> + <HintPath>..\..\..\packages\amqpnetlite\lib\net45\Amqp.Net.dll</HintPath> + </Reference> + <Reference Include="System" /> + <Reference Include="System.Core" /> + <Reference Include="System.Runtime.Serialization" /> + <Reference Include="System.Web.Extensions" /> + <Reference Include="System.Xml.Linq" /> + <Reference Include="System.Data.DataSetExtensions" /> + <Reference Include="Microsoft.CSharp" /> + <Reference Include="System.Data" /> + <Reference Include="System.Net.Http" /> + <Reference Include="System.Xml" /> + </ItemGroup> + <ItemGroup> + <Compile Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Sender/Sender.cs" /> + <Compile Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Sender/Properties/AssemblyInfo.cs" /> + </ItemGroup> + <ItemGroup> + <None Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Sender/App.config" /> + <None Include="@CMAKE_CURRENT_SOURCE_DIR@/amqp_large_content_test/Sender/packages.config" /> + </ItemGroup> + <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" /> +</Project> http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/shims/amqpnetlite/src/amqp_large_content_test/Sender/packages.config ---------------------------------------------------------------------- diff --git a/shims/amqpnetlite/src/amqp_large_content_test/Sender/packages.config b/shims/amqpnetlite/src/amqp_large_content_test/Sender/packages.config new file mode 100644 index 0000000..7496c87 --- /dev/null +++ b/shims/amqpnetlite/src/amqp_large_content_test/Sender/packages.config @@ -0,0 +1,21 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<packages> +</packages> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/63243b69/src/python/qpid_interop_test/amqp_large_content_test.py ---------------------------------------------------------------------- diff --git a/src/python/qpid_interop_test/amqp_large_content_test.py b/src/python/qpid_interop_test/amqp_large_content_test.py index 7bc97cf..3b0688f 100755 --- a/src/python/qpid_interop_test/amqp_large_content_test.py +++ b/src/python/qpid_interop_test/amqp_large_content_test.py @@ -218,12 +218,21 @@ if __name__ == '__main__': 'Receiver.py') PROTON_PYTHON_SENDER_SHIM = path.join(QIT_TEST_SHIM_HOME, 'qpid-proton-python', 'amqp_large_content_test', 'Sender.py') + AMQPNETLITE_RECEIVER_SHIM = path.join(QIT_TEST_SHIM_HOME, 'amqpnetlite', 'amqp_large_content_test', 'Receiver.exe') + AMQPNETLITE_SENDER_SHIM = path.join(QIT_TEST_SHIM_HOME, 'amqpnetlite', 'amqp_large_content_test', 'Sender.exe') SHIM_MAP = {qpid_interop_test.shims.ProtonCppShim.NAME: \ qpid_interop_test.shims.ProtonCppShim(PROTON_CPP_SENDER_SHIM, PROTON_CPP_RECEIVER_SHIM), qpid_interop_test.shims.ProtonPythonShim.NAME: \ qpid_interop_test.shims.ProtonPythonShim(PROTON_PYTHON_SENDER_SHIM, PROTON_PYTHON_RECEIVER_SHIM), } + # Add shims that need detection during installation only if the necessary bits are present + # AMQP DotNetLite client + if path.isfile(AMQPNETLITE_RECEIVER_SHIM) and path.isfile(AMQPNETLITE_SENDER_SHIM): + SHIM_MAP[qpid_interop_test.shims.AmqpNetLiteShim.NAME] = \ + qpid_interop_test.shims.AmqpNetLiteShim(AMQPNETLITE_SENDER_SHIM, AMQPNETLITE_RECEIVER_SHIM) + else: + print 'WARNING: AMQP DotNetLite shims not installed' ARGS = TestOptions(SHIM_MAP).args #print 'ARGS:', ARGS # debug --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
