fix for: https://issues.apache.org/jira/browse/AMQNET-345
Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/commit/5ccb7111 Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/tree/5ccb7111 Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/diff/5ccb7111 Branch: refs/heads/1.5.x Commit: 5ccb7111e907e75ec0d3b88b063851dd0782f24e Parents: b7a52e8 Author: Timothy A. Bish <[email protected]> Authored: Thu Nov 3 23:46:52 2011 +0000 Committer: Timothy A. Bish <[email protected]> Committed: Thu Nov 3 23:46:52 2011 +0000 ---------------------------------------------------------------------- src/main/csharp/Protocol/StompFrame.cs | 113 +++++++++++++++++- src/main/csharp/Protocol/StompFrameStream.cs | 124 -------------------- src/main/csharp/Protocol/StompWireFormat.cs | 25 ++-- src/main/csharp/Transport/InactivityMonitor.cs | 3 +- 4 files changed, 126 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/5ccb7111/src/main/csharp/Protocol/StompFrame.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Protocol/StompFrame.cs b/src/main/csharp/Protocol/StompFrame.cs index 52ef99b..527f298 100644 --- a/src/main/csharp/Protocol/StompFrame.cs +++ b/src/main/csharp/Protocol/StompFrame.cs @@ -33,10 +33,18 @@ namespace Apache.NMS.Stomp.Protocol public const byte FRAME_TERMINUS = (byte) 0; /// Used to denote a Special KeepAlive command that consists of a single newline. public const String KEEPALIVE = "KEEPALIVE"; - + + public const byte BREAK = (byte)('\n'); + public const byte COLON = (byte)(':'); + public const byte ESCAPE = (byte)('\\'); + public readonly byte[] ESCAPE_ESCAPE_SEQ = new byte[2]{ 92, 92 }; + public readonly byte[] COLON_ESCAPE_SEQ = new byte[2]{ 92, 99 }; + public readonly byte[] NEWLINE_ESCAPE_SEQ = new byte[2]{ 92, 110 }; + private string command; private IDictionary properties = new Hashtable(); private byte[] content; + private bool encodingEnabled; private readonly Encoding encoding = new UTF8Encoding(); @@ -44,10 +52,27 @@ namespace Apache.NMS.Stomp.Protocol { } + public StompFrame(bool encodingEnabled) + { + this.encodingEnabled = encodingEnabled; + } + public StompFrame(string command) { this.command = command; } + + public StompFrame(string command, bool encodingEnabled) + { + this.command = command; + this.encodingEnabled = encodingEnabled; + } + + public bool EncodingEnabled + { + get { return this.encodingEnabled; } + set { this.encodingEnabled = value; } + } public byte[] Content { @@ -152,7 +177,7 @@ namespace Apache.NMS.Stomp.Protocol { builder.Append(key); builder.Append(SEPARATOR); - builder.Append(this.Properties[key] as string); + builder.Append(EncodeHeader(this.Properties[key] as string)); builder.Append(NEWLINE); } @@ -205,7 +230,7 @@ namespace Apache.NMS.Stomp.Protocol // to store them all but for now we just throw the rest out. if(!this.properties.Contains(key)) { - this.properties[key] = value; + this.properties[key] = DecodeHeader(value); } } else @@ -267,6 +292,86 @@ namespace Apache.NMS.Stomp.Protocol byte[] data = ms.ToArray(); return encoding.GetString(data, 0, data.Length); - } + } + + private String EncodeHeader(String header) + { + String result = header; + if(this.encodingEnabled) + { + byte[] utf8buf = this.encoding.GetBytes(header); + MemoryStream stream = new MemoryStream(utf8buf.Length); + foreach(byte val in utf8buf) + { + switch(val) + { + case ESCAPE: + stream.Write(ESCAPE_ESCAPE_SEQ, 0, ESCAPE_ESCAPE_SEQ.Length); + break; + case BREAK: + stream.Write(NEWLINE_ESCAPE_SEQ, 0, NEWLINE_ESCAPE_SEQ.Length); + break; + case COLON: + stream.Write(COLON_ESCAPE_SEQ, 0, COLON_ESCAPE_SEQ.Length); + break; + default: + stream.WriteByte(val); + break; + } + } + + byte[] data = stream.ToArray(); + result = encoding.GetString(data, 0, data.Length); + } + + return result; + } + + private String DecodeHeader(String header) + { + MemoryStream decoded = new MemoryStream(); + + int value = -1; + byte[] utf8buf = this.encoding.GetBytes(header); + MemoryStream stream = new MemoryStream(utf8buf); + + while((value = stream.ReadByte()) != -1) + { + if(value == 92) + { + int next = stream.ReadByte(); + if (next != -1) + { + switch(next) { + case 110: + decoded.WriteByte(BREAK); + break; + case 99: + decoded.WriteByte(COLON); + break; + case 92: + decoded.WriteByte(ESCAPE); + break; + default: + stream.Seek(-1, SeekOrigin.Current); + decoded.WriteByte((byte)value); + break; + } + } + else + { + decoded.WriteByte((byte)value); + } + + } + else + { + decoded.WriteByte((byte)value); + } + } + + byte[] data = decoded.ToArray(); + return encoding.GetString(data, 0, data.Length); + } } } http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/5ccb7111/src/main/csharp/Protocol/StompFrameStream.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Protocol/StompFrameStream.cs b/src/main/csharp/Protocol/StompFrameStream.cs deleted file mode 100644 index fcd8fb7..0000000 --- a/src/main/csharp/Protocol/StompFrameStream.cs +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -using System; -using System.IO; -using System.Text; - -using Apache.NMS.Stomp.Commands; - -namespace Apache.NMS.Stomp.Protocol -{ - /// <summary> - /// A Stream for writing a <a href="http://stomp.codehaus.org/">STOMP</a> Frame - /// </summary> - public class StompFrameStream - { - /// Used to terminate a header line or end of a headers section of the Frame. - public const String NEWLINE = "\n"; - /// Used to seperate the Key / Value pairing in Frame Headers - public const String SEPARATOR = ":"; - /// Used to mark the End of the Frame. - public const byte FRAME_TERMINUS = (byte) 0; - - private readonly StringBuilder builder = new StringBuilder(); - private readonly BinaryWriter ds; - private byte[] content; - private int contentLength = -1; - private readonly Encoding encoding; - - public StompFrameStream(BinaryWriter ds, Encoding encoding) - { - this.ds = ds; - this.encoding = encoding; - } - - public byte[] Content - { - get { return content; } - set { content = value; } - } - - public int ContentLength - { - get { return contentLength; } - set - { - contentLength = value; - WriteHeader("content-length", contentLength); - } - } - - public void WriteCommand(Command command, String name) - { - WriteCommand(command, name, false); - } - - public void WriteCommand(Command command, String name, bool ignoreErrors) - { - builder.Append(name); - builder.Append(NEWLINE); - if(command.ResponseRequired) - { - if(ignoreErrors) - { - WriteHeader("receipt", "ignore:" + command.CommandId); - } - else - { - WriteHeader("receipt", command.CommandId); - } - } - } - - public void WriteHeader(String name, Object value) - { - if(value != null) - { - builder.Append(name); - builder.Append(SEPARATOR); - builder.Append(value); - builder.Append(NEWLINE); - } - } - - public void WriteHeader(String name, bool value) - { - if(value) - { - builder.Append(name); - builder.Append(SEPARATOR); - builder.Append("true"); - builder.Append(NEWLINE); - } - } - - public void Flush() - { - builder.Append(NEWLINE); - ds.Write(encoding.GetBytes(builder.ToString())); - - if (content != null) - { - ds.Write(content); - } - - // Always write a terminating NULL byte to end the content frame. - ds.Write(FRAME_TERMINUS); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/5ccb7111/src/main/csharp/Protocol/StompWireFormat.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Protocol/StompWireFormat.cs b/src/main/csharp/Protocol/StompWireFormat.cs index 6b864f7..bf6e278 100644 --- a/src/main/csharp/Protocol/StompWireFormat.cs +++ b/src/main/csharp/Protocol/StompWireFormat.cs @@ -33,6 +33,7 @@ namespace Apache.NMS.Stomp.Protocol private ITransport transport; private WireFormatInfo remoteWireFormatInfo; private int connectedResponseId = -1; + private bool encodeHeaders = false; public StompWireFormat() { @@ -116,7 +117,7 @@ namespace Apache.NMS.Stomp.Protocol public Object Unmarshal(BinaryReader dataIn) { - StompFrame frame = new StompFrame(); + StompFrame frame = new StompFrame(this.encodeHeaders); frame.FromStream(dataIn); Object answer = CreateCommand(frame); @@ -197,6 +198,11 @@ namespace Apache.NMS.Stomp.Protocol { remoteWireFormatInfo.Version = Single.Parse(frame.RemoveProperty("version")); + if(remoteWireFormatInfo.Version > 1.0f) + { + this.encodeHeaders = true; + } + if(frame.HasProperty("session")) { remoteWireFormatInfo.Session = frame.RemoveProperty("session"); @@ -333,7 +339,7 @@ namespace Apache.NMS.Stomp.Protocol protected virtual void WriteMessage(Message command, BinaryWriter dataOut) { - StompFrame frame = new StompFrame("SEND"); + StompFrame frame = new StompFrame("SEND", encodeHeaders); if(command.ResponseRequired) { frame.SetProperty("receipt", command.CommandId); @@ -418,7 +424,7 @@ namespace Apache.NMS.Stomp.Protocol protected virtual void WriteMessageAck(MessageAck command, BinaryWriter dataOut) { - StompFrame frame = new StompFrame("ACK"); + StompFrame frame = new StompFrame("ACK", encodeHeaders); if(command.ResponseRequired) { frame.SetProperty("receipt", "ignore:" + command.CommandId); @@ -443,8 +449,7 @@ namespace Apache.NMS.Stomp.Protocol protected virtual void WriteConnectionInfo(ConnectionInfo command, BinaryWriter dataOut) { // lets force a receipt for the Connect Frame. - - StompFrame frame = new StompFrame("CONNECT"); + StompFrame frame = new StompFrame("CONNECT", encodeHeaders); frame.SetProperty("client-id", command.ClientId); if(!String.IsNullOrEmpty(command.UserName)) @@ -477,7 +482,7 @@ namespace Apache.NMS.Stomp.Protocol { System.Diagnostics.Debug.Assert(!command.ResponseRequired); - StompFrame frame = new StompFrame("DISCONNECT"); + StompFrame frame = new StompFrame("DISCONNECT", encodeHeaders); if(Tracer.IsDebugEnabled) { @@ -489,7 +494,7 @@ namespace Apache.NMS.Stomp.Protocol protected virtual void WriteConsumerInfo(ConsumerInfo command, BinaryWriter dataOut) { - StompFrame frame = new StompFrame("SUBSCRIBE"); + StompFrame frame = new StompFrame("SUBSCRIBE", encodeHeaders); if(command.ResponseRequired) { @@ -552,7 +557,7 @@ namespace Apache.NMS.Stomp.Protocol protected virtual void WriteKeepAliveInfo(KeepAliveInfo command, BinaryWriter dataOut) { - StompFrame frame = new StompFrame(StompFrame.KEEPALIVE); + StompFrame frame = new StompFrame(StompFrame.KEEPALIVE, encodeHeaders); if(Tracer.IsDebugEnabled) { @@ -564,7 +569,7 @@ namespace Apache.NMS.Stomp.Protocol protected virtual void WriteRemoveInfo(RemoveInfo command, BinaryWriter dataOut) { - StompFrame frame = new StompFrame("UNSUBSCRIBE"); + StompFrame frame = new StompFrame("UNSUBSCRIBE", encodeHeaders); object id = command.ObjectId; if(id is ConsumerId) @@ -604,7 +609,7 @@ namespace Apache.NMS.Stomp.Protocol Tracer.Debug("StompWireFormat - For transaction type: " + transactionType + " we are using command type: " + type); - StompFrame frame = new StompFrame(type); + StompFrame frame = new StompFrame(type, encodeHeaders); if(command.ResponseRequired) { frame.SetProperty("receipt", command.CommandId); http://git-wip-us.apache.org/repos/asf/activemq-nms-stomp/blob/5ccb7111/src/main/csharp/Transport/InactivityMonitor.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Transport/InactivityMonitor.cs b/src/main/csharp/Transport/InactivityMonitor.cs index 1614394..9146693 100644 --- a/src/main/csharp/Transport/InactivityMonitor.cs +++ b/src/main/csharp/Transport/InactivityMonitor.cs @@ -335,7 +335,8 @@ namespace Apache.NMS.Stomp.Transport this.asyncWriteTask = new AsyncWriteTask(this); } - initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay; + initialDelayTime = localWireFormatInfo.MaxInactivityDurationInitialDelay > 0 ? + localWireFormatInfo.MaxInactivityDurationInitialDelay : writeCheckTime; Tracer.DebugFormat("InactivityMonitor[{0}]: Read Check time interval: {1}", instanceId, readCheckTime );
