Author: shuston
Date: Thu Dec 17 16:43:44 2009
New Revision: 891795

URL: http://svn.apache.org/viewvc?rev=891795&view=rev
Log:
Apply patch QPID-2230.patch; resolves QPID-2230.

Added:
    qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/
    
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs
    qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs
    
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj
Modified:
    qpid/trunk/qpid/wcf/QpidWcf.sln

Modified: qpid/trunk/qpid/wcf/QpidWcf.sln
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/QpidWcf.sln?rev=891795&r1=891794&r2=891795&view=diff
==============================================================================
--- qpid/trunk/qpid/wcf/QpidWcf.sln (original)
+++ qpid/trunk/qpid/wcf/QpidWcf.sln Thu Dec 17 16:43:44 2009
@@ -32,6 +32,8 @@
 EndProject
 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Channel", 
"src\Apache\Qpid\Channel\Channel.csproj", 
"{8AABAB30-7D1E-4539-B7D1-05450262BAD2}"
 EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WcfPerftest", 
"test\Apache\Qpid\Test\Channel\WcfPerftest\WcfPerftest.csproj", 
"{D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}"
+EndProject
 Global
        GlobalSection(SolutionConfigurationPlatforms) = preSolution
                Debug|Any CPU = Debug|Any CPU
@@ -107,6 +109,20 @@
                {8AABAB30-7D1E-4539-B7D1-05450262BAD2}.Release|Win32.ActiveCfg 
= Release|Any CPU
                {8AABAB30-7D1E-4539-B7D1-05450262BAD2}.Release|x64.ActiveCfg = 
Release|Any CPU
                {8AABAB30-7D1E-4539-B7D1-05450262BAD2}.Release|x86.ActiveCfg = 
Release|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|Any CPU.ActiveCfg 
= Debug|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|Any CPU.Build.0 = 
Debug|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|Mixed 
Platforms.ActiveCfg = Debug|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|Mixed 
Platforms.Build.0 = Debug|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|Win32.ActiveCfg = 
Debug|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|x64.ActiveCfg = 
Debug|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Debug|x86.ActiveCfg = 
Debug|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|Any 
CPU.ActiveCfg = Release|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|Any CPU.Build.0 
= Release|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|Mixed 
Platforms.ActiveCfg = Release|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|Mixed 
Platforms.Build.0 = Release|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|Win32.ActiveCfg 
= Release|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|x64.ActiveCfg = 
Release|Any CPU
+               {D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}.Release|x86.ActiveCfg = 
Release|Any CPU
        EndGlobalSection
        GlobalSection(SolutionProperties) = preSolution
                HideSolutionNode = FALSE

Added: 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs?rev=891795&view=auto
==============================================================================
--- 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs 
(added)
+++ 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/RawBodyUtility.cs 
Thu Dec 17 16:43:44 2009
@@ -0,0 +1,161 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.WcfPerftest
+{
+    using System;
+    using System.Collections;
+    using System.IO;
+    using System.ServiceModel;
+    using System.ServiceModel.Channels;
+    using System.ServiceModel.Description;
+    using System.Threading;
+    using System.Text;
+    using System.Xml;
+    using Apache.Qpid.Channel;
+
+
+    /// <summary>
+    /// A sample interface for populating and extracting message body content.
+    /// Just enough methods to handle basic Interop text and raw byte messages.
+    /// </summary>
+ 
+    
+    public interface IRawBodyUtility
+    {
+        Message CreateMessage(byte[] body, int offset, int len);
+        Message CreateMessage(byte[] body);
+        byte[] GetBytes(Message m, byte[] recyclableBuffer);
+
+        Message CreateMessage(string body);
+        string GetText(Message m);
+    }
+
+    // an implementation of IRawBodyUtility that expects a RawMessageEncoder 
based channel
+
+    public class RawEncoderUtility : IRawBodyUtility
+    {
+        public Message CreateMessage(byte[] body, int offset, int count)
+        {
+            return Message.CreateMessage(MessageVersion.None, "", new 
RawEncoderBodyWriter(body, offset, count));
+        }
+
+        public Message CreateMessage(byte[] body)
+        {
+            return CreateMessage(body, 0, body.Length);
+        }
+
+        public byte[] GetBytes(Message message, byte[] recyclableBuffer)
+        {
+            XmlDictionaryReader reader = message.GetReaderAtBodyContents();
+            int length;
+
+            while (!reader.HasValue)
+            {
+                reader.Read();
+                if (reader.EOF)
+                {
+                    throw new InvalidDataException("empty 
XmlDictionaryReader");
+                }
+            }
+
+            if (reader.TryGetBase64ContentLength(out length))
+            {
+                byte[] bytes = null;
+                if (recyclableBuffer != null)
+                {
+                    if (recyclableBuffer.Length == length)
+                    {
+                        // reuse
+                        bytes = recyclableBuffer;
+                    }
+                }
+
+                if (bytes == null)
+                {
+                    bytes = new byte[length];
+                }
+
+                // this is the single copy mechanism from native to managed 
space with no intervening
+                // buffers.  One could also write a method GetBytes(msg, 
myBuf, offset)...
+                reader.ReadContentAsBase64(bytes, 0, length);
+                reader.Close();
+                return bytes;
+            }
+            else
+            {
+                // uses whatever default buffering mechanism is used by the 
base XmlDictionaryReader class
+                return reader.ReadContentAsBase64();
+            }
+        }
+
+        public Message CreateMessage(string body)
+        {
+            return Message.CreateMessage(MessageVersion.None, "", new 
RawEncoderBodyWriter(body));
+        }
+
+        public string GetText(Message message)
+        {
+            byte[] rawBuffer = GetBytes(message, null);
+            return Encoding.UTF8.GetString(rawBuffer, 0, rawBuffer.Length);
+        }
+
+        internal class RawEncoderBodyWriter : BodyWriter
+        {
+            // works only with the Raw Encoder; the "body" is either a single 
string or byte[] segment
+            String bodyAsString;
+            byte[] bodyAsBytes;
+            int offset;
+            int count;
+
+            public RawEncoderBodyWriter(string body)
+                : base(false)               // isBuffered
+            {
+                this.bodyAsString = body;
+            }
+
+            public RawEncoderBodyWriter(byte[] body, int offset, int count)
+                : base(false)               // isBuffered
+            {
+                this.bodyAsBytes = body;
+                this.offset = offset;
+                this.count = count;
+            }
+
+            protected override void 
OnWriteBodyContents(System.Xml.XmlDictionaryWriter writer)
+            {
+                // TODO:  RawMessageEncoder.StreamElementName should be public.
+                writer.WriteStartElement("Binary");  // the expected Raw 
encoder "<Binary>" virtual xml tag
+
+                if (bodyAsString != null)
+                {
+                    byte[] buf = Encoding.UTF8.GetBytes(bodyAsString);
+                    writer.WriteBase64(buf, 0, buf.Length);
+                }
+                else
+                {
+                    writer.WriteBase64(this.bodyAsBytes, this.offset, 
this.count);
+                }
+
+                writer.WriteEndElement();
+            }
+        }
+    }
+
+}

Added: 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs?rev=891795&view=auto
==============================================================================
--- 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs 
(added)
+++ 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.cs 
Thu Dec 17 16:43:44 2009
@@ -0,0 +1,661 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+namespace Apache.Qpid.Test.Channel.WcfPerftest
+{
+    using System;
+    using System.Collections;
+    using System.Collections.Generic;
+    using System.ComponentModel;
+    using System.Configuration;
+    using System.Diagnostics;
+    using System.IO;
+    using System.ServiceModel;
+    using System.ServiceModel.Channels;
+    using System.ServiceModel.Description;
+    using System.Threading;
+    using System.Transactions;
+    using System.Text;
+    using System.Xml;
+    using Apache.Qpid.AmqpTypes;
+    using Apache.Qpid.Channel;
+
+    // this program implements a subset of the functionality in 
qpid\cpp\src\tests\perftest.cpp
+
+    // for a given broker, create reader and writer channels to 
queues/exchanges
+    // lazilly creates binding and channel factories
+
+    public class QueueChannelFactory
+    {
+        private static AmqpBinding brokerBinding;
+        private static IChannelFactory<IInputChannel> readerFactory;
+        private static IChannelFactory<IOutputChannel> writerFactory;
+        private static string brokerAddr = "127.0.0.1";
+        private static int brokerPort = 5672;
+
+        public static void SetBroker(string addr, int port)
+        {
+            brokerAddr = addr;
+            brokerPort = port;
+        }
+
+        private static void InitializeBinding()
+        {
+            AmqpBinaryBinding binding = new AmqpBinaryBinding();
+            binding.BrokerHost = brokerAddr;
+            binding.BrokerPort = brokerPort;
+            binding.TransferMode = TransferMode.Streamed;
+            binding.PrefetchLimit = 5000;
+            binding.Shared = true;
+            brokerBinding = binding;
+        }
+
+        public static IInputChannel CreateReaderChannel(string queueName)
+        {
+            lock (typeof(QueueChannelFactory))
+            {
+                if (brokerBinding == null)
+                {
+                    InitializeBinding();
+                }
+
+                if (readerFactory == null)
+                {
+                    readerFactory = 
brokerBinding.BuildChannelFactory<IInputChannel>();
+                    readerFactory.Open();
+                }
+
+                IInputChannel channel = readerFactory.CreateChannel(new 
EndpointAddress(
+                    new Uri("amqp:" + queueName)));
+                channel.Open();
+
+                return channel;
+            }
+        }
+
+        public static IOutputChannel CreateWriterChannel(string exchangeName, 
string routingKey)
+        {
+            lock (typeof(QueueChannelFactory))
+            {
+                if (brokerBinding == null)
+                {
+                    InitializeBinding();
+                }
+
+                if (writerFactory == null)
+                {
+                    writerFactory = 
brokerBinding.BuildChannelFactory<IOutputChannel>();
+                    writerFactory.Open();
+                }
+
+                IOutputChannel channel = writerFactory.CreateChannel(new 
EndpointAddress(
+                    "amqp:" + exchangeName +
+                    "?routingkey=" + routingKey));
+                channel.Open();
+
+                return channel;
+            }
+        }
+    }
+
+    public enum ClientType
+    {
+        Publisher,
+        Subscriber,
+        InteropDemo
+    }
+
+    public class Options
+    {
+        public string broker;
+        public int port;
+        public UInt64 messageCount;
+        public int messageSize;
+        public ClientType type;
+        public string baseName;
+        public int subTxSize;
+        public int pubTxSize;
+        public bool durable;
+
+        public Options()
+        {
+            this.broker = "127.0.0.1";
+            this.port = 5672;
+            this.messageCount = 500000;
+            this.messageSize = 1024;
+            this.type = ClientType.InteropDemo;  // default: once as pub and 
once as sub
+            this.baseName = "perftest";
+            this.pubTxSize = 0;
+            this.subTxSize = 0;
+            this.durable = false;
+        }
+
+        public void Parse(string[] args)
+        {
+            int argCount = args.Length;
+            int current = 0;
+            bool typeSelected = false;
+
+            while (current < argCount)
+            {
+                string arg = args[current];
+                if (arg == "--publish")
+                {
+                    if (typeSelected)
+                        throw new ArgumentException("too many roles");
+
+                    this.type = ClientType.Publisher;
+                    typeSelected = true;
+                }
+                else if (arg == "--subscribe")
+                {
+                    if (typeSelected)
+                        throw new ArgumentException("too many roles");
+
+                    this.type = ClientType.Subscriber;
+                    typeSelected = true;
+                }
+                else if (arg == "--size")
+                {
+                    arg = args[++current];
+                    int i = int.Parse(arg);
+                    if (i > 0)
+                    {
+                        this.messageSize = i;
+                    }
+                }
+                else if (arg == "--count")
+                {
+                    arg = args[++current];
+                    UInt64 i = UInt64.Parse(arg);
+                    if (i > 0)
+                    {
+                        this.messageCount = i;
+                    }
+                }
+                else if (arg == "--broker")
+                {
+                    this.broker = args[++current];
+                }
+                else if (arg == "--port")
+                {
+                    arg = args[++current];
+                    int i = int.Parse(arg);
+                    if (i > 0)
+                    {
+                        this.port = i;
+                    }
+                }
+                else if (arg == "--base-name")
+                {
+                    this.baseName = args[++current];
+                }
+
+                else if (arg == "--tx")
+                {
+                    arg = args[++current];
+                    int i = int.Parse(arg);
+                    if (i > 0)
+                    {
+                        this.subTxSize = i;
+                        this.pubTxSize = i;
+                    }
+                }
+
+                else if (arg == "--durable")
+                {
+                    arg = args[++current];
+                    if (arg.Equals("yes"))
+                    {
+                        this.durable = true;
+                    }
+                }
+                
+                current++;
+            }
+        }
+    }
+
+
+    public class Client
+    {
+        protected Options opts;
+
+        public static void Expect(string actual, string expect)
+        {
+            if (expect != actual)
+            {
+                throw new Exception("Expecting " + expect + " but received " + 
actual);
+            }
+        }
+
+        public static void Close(IChannel channel)
+        {
+            if (channel == null)
+            {
+                return;
+            }
+
+            try
+            {
+                channel.Close();
+            }
+            catch (Exception e)
+            {
+                Console.WriteLine("channel close exception {0}", e);
+            }
+        }
+
+        public string Fqn(string name)
+        {
+            return opts.baseName + '_' + name;
+        }
+    }
+
+
+    public class WcfPerftest
+    {
+        static void WarmUpTransactionSubsystem(Options opts)
+        {
+            // see if any use of transactions is expected
+            if ((opts.type == ClientType.Publisher) && (opts.pubTxSize == 0))
+                return;
+
+            if ((opts.type == ClientType.Subscriber) && (opts.subTxSize == 0))
+                return;
+
+            if (opts.type == ClientType.InteropDemo)
+            {
+                if ((opts.subTxSize == 0) && (opts.pubTxSize == 0))
+                    return;
+            }
+
+            Console.WriteLine("Initializing transactions");
+            IRawBodyUtility bodyUtil = new RawEncoderUtility();
+
+            // send a transacted message to nowhere to force the initial 
registration with MSDTC
+            IOutputChannel channel = 
QueueChannelFactory.CreateWriterChannel("", Guid.NewGuid().ToString());
+            Message msg = bodyUtil.CreateMessage("sacrificial transacted 
message from WcfPerftest");
+            using (TransactionScope ts = new TransactionScope())
+            {
+                channel.Send(msg);
+                // abort/rollback
+                ts.Dispose();
+            }
+            channel.Close();
+            Console.WriteLine("transaction resource manager ready");
+        }
+
+        static void InteropDemo(Options opts)
+        {
+            string perftest_cpp_exe = "perftest.exe";
+            string commonArgs = String.Format(" --count {0} --size {1}", 
opts.messageCount, opts.messageSize);
+
+            if (opts.durable)
+            {
+                commonArgs += " --durable yes";
+            }
+
+            Console.WriteLine("===== WCF Subscriber and C++ Publisher =====");
+
+            Process setup = new Process();
+            setup.StartInfo.FileName = perftest_cpp_exe;
+            setup.StartInfo.UseShellExecute = false;
+            setup.StartInfo.Arguments = "--setup" + commonArgs;
+            try
+            {
+                setup.Start();
+            }
+            catch (Win32Exception win32e)
+            {
+                Console.WriteLine("Cannot execute {0}: PATH not set?", 
perftest_cpp_exe);
+                Console.WriteLine("   Error: {0}", win32e.Message);
+                return;
+            }
+            setup.WaitForExit();
+
+            Process control = new Process();
+            control.StartInfo.FileName = perftest_cpp_exe;
+            control.StartInfo.UseShellExecute = false;
+            control.StartInfo.Arguments = "--control" + commonArgs;
+            control.Start();
+
+            Process publish = new Process();
+            publish.StartInfo.FileName = perftest_cpp_exe;
+            publish.StartInfo.UseShellExecute = false;
+            publish.StartInfo.Arguments = "--publish" + commonArgs;
+            publish.Start();
+
+            SubscribeThread subscribeWcf = new SubscribeThread(opts.baseName + 
"0", opts);
+            Thread subThread = new Thread(subscribeWcf.Run);
+            subThread.Start();
+
+            subThread.Join();
+            publish.WaitForExit();
+            control.WaitForExit();
+
+            Console.WriteLine();
+            Console.WriteLine("=====  WCF Publisher and C++ Subscriber =====");
+
+            setup = new Process();
+            setup.StartInfo.FileName = perftest_cpp_exe;
+            setup.StartInfo.UseShellExecute = false;
+            setup.StartInfo.Arguments = "--setup" + commonArgs;
+            setup.Start();
+            setup.WaitForExit();
+
+            control = new Process();
+            control.StartInfo.FileName = perftest_cpp_exe;
+            control.StartInfo.UseShellExecute = false;
+            control.StartInfo.Arguments = "--control" + commonArgs;
+            control.Start();
+
+            PublishThread pub = new PublishThread(opts.baseName + "0", "", 
opts);
+            Thread pubThread = new Thread(pub.Run);
+            pubThread.Start();
+
+            Process subscribeCpp = new Process();
+            subscribeCpp.StartInfo.FileName = perftest_cpp_exe;
+            subscribeCpp.StartInfo.UseShellExecute = false;
+            subscribeCpp.StartInfo.Arguments = "--subscribe" + commonArgs;
+            subscribeCpp.Start();
+
+            subscribeCpp.WaitForExit();
+            pubThread.Join();
+            control.WaitForExit();
+        }
+
+        static void Main(string[] mainArgs)
+        {
+            Options opts = new Options();
+            opts.Parse(mainArgs);
+            QueueChannelFactory.SetBroker(opts.broker, opts.port);
+
+            WarmUpTransactionSubsystem(opts);
+
+            if (opts.type == ClientType.Publisher)
+            {
+                PublishThread pub = new PublishThread(opts.baseName + "0", "", 
opts);
+                Thread pubThread = new Thread(pub.Run);
+                pubThread.Start();
+                pubThread.Join();
+            }
+            else if (opts.type == ClientType.Subscriber)
+            {
+                SubscribeThread sub = new SubscribeThread(opts.baseName + "0", 
opts);
+                Thread subThread = new Thread(sub.Run);
+                subThread.Start();
+                subThread.Join();
+            }
+            else
+            {
+                InteropDemo(opts);   
+            }
+
+            if (System.Diagnostics.Debugger.IsAttached)
+            {
+                Console.WriteLine("Hit return to continue...");
+                Console.ReadLine();
+            }
+        }
+    }
+
+    public class PublishThread : Client
+    {
+        string destination; // exchange/queue
+        string routingKey;
+        int msgSize;
+        UInt64 msgCount;
+        IOutputChannel publishQueue;
+
+        public PublishThread(string key, string q, Options opts)
+        {
+            this.routingKey = key;
+            this.destination = q;
+            this.msgSize = opts.messageSize;
+            this.msgCount = opts.messageCount;
+            this.opts = opts;
+        }
+
+        static void StampSequenceNo(byte[] data, UInt64 n)
+        {
+            int wordLen = IntPtr.Size;  // mimic size_t in C++
+
+            if (data.Length < wordLen)
+                throw new ArgumentException("message size");
+            for (int i = 0; i < wordLen; i++)
+            {
+                data[i] = (byte) (n & 0xff);
+                n >>= 8;
+            }
+        }
+
+        public void Run()
+        {
+            IRawBodyUtility bodyUtil = new RawEncoderUtility();
+
+            IInputChannel startQueue = null;
+            IOutputChannel doneQueue = null;
+            UInt64 batchSize = (UInt64)opts.pubTxSize;
+            bool txPending = false;
+            AmqpProperties amqpProperties = null;
+
+            if (opts.durable)
+            {
+                amqpProperties = new AmqpProperties();
+                amqpProperties.Durable = true;
+            }
+
+            try
+            {
+                publishQueue = 
QueueChannelFactory.CreateWriterChannel(this.destination, this.routingKey);
+                doneQueue = QueueChannelFactory.CreateWriterChannel("", 
this.Fqn("pub_done"));
+                startQueue = 
QueueChannelFactory.CreateReaderChannel(this.Fqn("pub_start"));
+
+                // wait for our start signal
+                Message msg;
+                msg = startQueue.Receive(TimeSpan.MaxValue);
+                Expect(bodyUtil.GetText(msg), "start");
+                msg.Close();
+
+                Stopwatch stopwatch = new Stopwatch();
+                AsyncCallback sendCallback = new 
AsyncCallback(this.AsyncSendCB);
+
+                byte[] data = new byte[this.msgSize];
+                IAsyncResult sendResult = null;
+
+                Console.WriteLine("sending {0}", this.msgCount);
+                stopwatch.Start();
+
+                if (batchSize > 0)
+                {
+                    Transaction.Current = new CommittableTransaction();
+                }
+
+                for (UInt64 i = 0; i < this.msgCount; i++)
+                {
+                    StampSequenceNo(data, i);
+                    msg = bodyUtil.CreateMessage(data);
+                    if (amqpProperties != null)
+                    {
+                        msg.Properties.Add("AmqpProperties", amqpProperties);
+                    }
+
+                    sendResult = publishQueue.BeginSend(msg, 
TimeSpan.MaxValue, sendCallback, msg);
+
+                    if (batchSize > 0)
+                    {
+                        txPending = true;
+                        if (((i + 1) % batchSize) == 0)
+                        {
+                            
((CommittableTransaction)Transaction.Current).Commit();
+                            txPending = false;
+                            Transaction.Current = new CommittableTransaction();
+                        }
+                    }
+                }
+
+                if (txPending)
+                {
+                    ((CommittableTransaction)Transaction.Current).Commit();
+                }
+
+                Transaction.Current = null;
+
+                sendResult.AsyncWaitHandle.WaitOne();
+                stopwatch.Stop();
+
+                double mps = (msgCount / stopwatch.Elapsed.TotalSeconds);
+
+                msg = bodyUtil.CreateMessage(String.Format("{0:0.##}", mps));
+                doneQueue.Send(msg, TimeSpan.MaxValue);
+                msg.Close();
+            }
+            finally
+            {
+                Close((IChannel)doneQueue);
+                Close((IChannel)publishQueue);
+                Close(startQueue);
+            }
+        }
+
+        void AsyncSendCB(IAsyncResult result)
+        {
+            publishQueue.EndSend(result);
+            ((Message)result.AsyncState).Close();
+        }
+    }
+
+    public class SubscribeThread : Client
+    {
+        string queue;
+        int msgSize;
+        UInt64 msgCount;
+        IInputChannel subscribeQueue;
+
+        public SubscribeThread(string q, Options opts)
+        {
+            this.queue = q;
+            this.msgSize = opts.messageSize;
+            this.msgCount = opts.messageCount;
+            this.opts = opts;
+        }
+
+        static UInt64 GetSequenceNumber(byte[] data)
+        {
+            int wordLen = IntPtr.Size;  // mimic size_t in C++
+
+            if (data.Length < wordLen)
+                throw new ArgumentException("message size");
+            UInt64 n = 0;
+            for (int i = (wordLen - 1); i >= 0; i--)
+            {
+                n = (256 * n) + data[i];
+            }
+            return n;
+        }
+
+        public void Run()
+        {
+            IRawBodyUtility bodyUtil = new RawEncoderUtility();
+
+            IOutputChannel readyQueue = null;
+            IOutputChannel doneQueue = null;
+            UInt64 batchSize = (UInt64)opts.subTxSize;
+            bool txPending = false;
+            byte[] data = null;
+
+            try
+            {
+                this.subscribeQueue = 
QueueChannelFactory.CreateReaderChannel(this.queue);
+                readyQueue = QueueChannelFactory.CreateWriterChannel("", 
this.Fqn("sub_ready"));
+                doneQueue = QueueChannelFactory.CreateWriterChannel("", 
this.Fqn("sub_done"));
+
+                Message msg = bodyUtil.CreateMessage("ready");
+                readyQueue.Send(msg, TimeSpan.MaxValue);
+                msg.Close();
+
+
+                Stopwatch stopwatch = new Stopwatch();
+                stopwatch.Start();
+
+                Console.WriteLine("receiving {0}", this.msgCount);
+                UInt64 expect = 0;
+
+                if (batchSize > 0)
+                {
+                    Transaction.Current = new CommittableTransaction();
+                }
+
+                for (UInt64 i = 0; i < this.msgCount; i++)
+                {
+                    msg = subscribeQueue.Receive(TimeSpan.MaxValue);
+
+                    data = bodyUtil.GetBytes(msg, data);
+                    msg.Close();
+                    if (data.Length != this.msgSize)
+                    {
+                        throw new Exception("subscribe message size mismatch");
+                    }
+
+                    UInt64 n = GetSequenceNumber(data);
+                    if (n != expect)
+                    {
+                        throw new Exception(String.Format("message sequence 
error. expected {0} got {1}", expect, n));
+                    }
+                    expect = n + 1;
+
+                    if (batchSize > 0)
+                    {
+                        txPending = true;
+                        if (((i + 1) % batchSize) == 0)
+                        {
+                            
((CommittableTransaction)Transaction.Current).Commit();
+                            txPending = false;
+                            Transaction.Current = new CommittableTransaction();
+                        }
+                    }
+                }
+
+                if (txPending)
+                {
+                    ((CommittableTransaction)Transaction.Current).Commit();
+                }
+
+                Transaction.Current = null;
+
+                stopwatch.Stop();
+
+                double mps = (msgCount / stopwatch.Elapsed.TotalSeconds);
+
+                msg = bodyUtil.CreateMessage(String.Format("{0:0.##}", mps));
+                doneQueue.Send(msg, TimeSpan.MaxValue);
+                msg.Close();
+
+                subscribeQueue.Close();
+            }
+            finally
+            {
+                Close((IChannel)doneQueue);
+                Close((IChannel)this.subscribeQueue);
+                Close(readyQueue);
+            }
+        }
+    }
+}

Added: 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj?rev=891795&view=auto
==============================================================================
--- 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj
 (added)
+++ 
qpid/trunk/qpid/wcf/test/Apache/Qpid/Test/Channel/WcfPerftest/WcfPerftest.csproj
 Thu Dec 17 16:43:44 2009
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<Project ToolsVersion="3.5" DefaultTargets="Build" 
xmlns="http://schemas.microsoft.com/developer/msbuild/2003";>
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>9.0.21022</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{D0F8FDE4-7AC6-4CFF-986A-50D06F7FD733}</ProjectGuid>
+    <OutputType>Exe</OutputType>
+    <RootNamespace>Apache.Qpid.Test.Channel.WcfPerftest</RootNamespace>
+    <AssemblyName>WcfPerftest</AssemblyName>
+    <TargetFrameworkVersion>v3.5</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' 
">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 
'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="System" />
+    <Reference Include="System.Runtime.Serialization">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.ServiceModel">
+      <RequiredTargetFramework>3.0</RequiredTargetFramework>
+    </Reference>
+    <Reference Include="System.Transactions" />
+    <Reference Include="System.XML" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="WcfPerftest.cs" />
+    <Compile Include="RawBodyUtility.cs" />
+  </ItemGroup>
+  <ItemGroup>
+    <ProjectReference 
Include="..\..\..\..\..\..\src\Apache\Qpid\Channel\Channel.csproj">
+      <Project>{8AABAB30-7D1E-4539-B7D1-05450262BAD2}</Project>
+      <Name>Channel</Name>
+    </ProjectReference>
+    <ProjectReference 
Include="..\..\..\..\..\..\src\Apache\Qpid\Interop\Interop.vcproj">
+      <Project>{C9B6AC75-6332-47A4-B82B-0C20E0AF2D34}</Project>
+      <Name>Interop</Name>
+    </ProjectReference>
+  </ItemGroup>
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <!-- To modify your build process, add your task inside one of the targets 
below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to