Author: jgomes
Date: Tue Sep  2 13:52:24 2008
New Revision: 691378

URL: http://svn.apache.org/viewvc?rev=691378&view=rev
Log:
Integrated patch from Stefan Gmeiner.  Slight modifications for code clean up 
and support for .NET Compact Framework.
Fixes [AMQNET-109]. (See https://issues.apache.org/activemq/browse/AMQNET-109)

Modified:
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
    
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/MessageProducer.cs
 Tue Sep  2 13:52:24 2008
@@ -34,7 +34,6 @@
                private bool msgPersistent = NMSConstants.defaultPersistence;
                private TimeSpan requestTimeout;
                private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive;
-               private readonly bool defaultSpecifiedTimeToLive = false;
                private byte msgPriority = NMSConstants.defaultPriority;
                private bool disableMessageID = false;
                private bool disableMessageTimestamp = false;
@@ -118,17 +117,17 @@
 
                public void Send(IMessage message)
                {
-                       Send(info.Destination, message);
+                       Send(info.Destination, message, this.msgPersistent, 
this.msgPriority, this.msgTimeToLive, false);
                }
 
                public void Send(IDestination destination, IMessage message)
                {
-                       Send(destination, message, Persistent, Priority, 
TimeToLive, defaultSpecifiedTimeToLive);
+                       Send(destination, message, this.msgPersistent, 
this.msgPriority, this.msgTimeToLive, false);
                }
 
                public void Send(IMessage message, bool persistent, byte 
priority, TimeSpan timeToLive)
                {
-                       Send(info.Destination, message, persistent, priority, 
timeToLive);
+                       Send(info.Destination, message, persistent, priority, 
timeToLive, true);
                }
 
                public void Send(IDestination destination, IMessage message, 
bool persistent, byte priority, TimeSpan timeToLive)

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
 Tue Sep  2 13:52:24 2008
@@ -21,6 +21,7 @@
 using System;
 using System.IO;
 using Apache.NMS;
+using Apache.NMS.ActiveMQ.Transport.Tcp;
 
 namespace Apache.NMS.ActiveMQ.OpenWire
 {
@@ -34,9 +35,10 @@
         private const byte NULL_TYPE = 0;
                
                private int version;
-               private bool stackTraceEnabled=false;
-               private bool tightEncodingEnabled=false;
-               private bool sizePrefixDisabled=false;
+               private bool stackTraceEnabled = false;
+               private bool tightEncodingEnabled = false;
+               private bool sizePrefixDisabled = false;
+               private bool tcpNoDelayEnabled = false;
         private int minimumVersion=1;
 
         private WireFormatInfo preferedWireFormatInfo = new WireFormatInfo();
@@ -46,7 +48,7 @@
         {
             PreferedWireFormatInfo.StackTraceEnabled = false;
             PreferedWireFormatInfo.TightEncodingEnabled = false;
-            PreferedWireFormatInfo.TcpNoDelayEnabled = false;
+            PreferedWireFormatInfo.TcpNoDelayEnabled = true;
             PreferedWireFormatInfo.CacheEnabled = false;
             PreferedWireFormatInfo.SizePrefixDisabled = false;
             PreferedWireFormatInfo.Version = 2;
@@ -55,18 +57,17 @@
             Version = 1;
         }
 
-        public ITransport Transport {
+        public ITransport Transport
+               {
                        get { return transport; }
                        set { transport = value; }
                }
                                
-        public bool StackTraceEnabled {
-            get { return stackTraceEnabled; }
-                       set { stackTraceEnabled = value; }
-        }
-        public int Version {
+        public int Version
+               {
             get { return version; }
-                       set {
+                       set
+                       {
 
                 Assembly dll = Assembly.GetExecutingAssembly();
                 Type type = 
dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false);
@@ -75,15 +76,31 @@
                            version = value;                    
                        }
         }
-        public bool SizePrefixDisabled {
+
+               public bool StackTraceEnabled
+               {
+                       get { return stackTraceEnabled; }
+                       set { stackTraceEnabled = value; }
+               }
+
+               public bool SizePrefixDisabled
+               {
             get { return sizePrefixDisabled; }
                        set { sizePrefixDisabled = value; }
         }
-        public bool TightEncodingEnabled {
+
+        public bool TightEncodingEnabled 
+               {
             get { return tightEncodingEnabled; }
                        set { tightEncodingEnabled = value; }
         }
 
+               public bool TcpNoDelayEnabled
+               {
+                       get { return tcpNoDelayEnabled; }
+                       set { tcpNoDelayEnabled = value; }
+               }
+
         public WireFormatInfo PreferedWireFormatInfo
         {
             get { return preferedWireFormatInfo; }
@@ -161,13 +178,14 @@
         public Object Unmarshal(BinaryReader dis)
         {
             // lets ignore the size of the packet
-                       if( !sizePrefixDisabled ) {
+                       if(!sizePrefixDisabled)
+                       {
                                dis.ReadInt32();
                        }
             
             // first byte is the type of the packet
             byte dataType = dis.ReadByte();
-            if (dataType != NULL_TYPE)
+            if(dataType != NULL_TYPE)
             {
                 BaseDataStreamMarshaller dsm = dataMarshallers[dataType & 
0xFF];
                 if (dsm == null)
@@ -175,12 +193,15 @@
                 Tracer.Debug("Parsing type: " + dataType + " with: " + dsm);
                 Object data = dsm.CreateObject();
                                
-                               if(tightEncodingEnabled) {
+                               if(tightEncodingEnabled)
+                               {
                                        BooleanStream bs = new BooleanStream();
                                        bs.Unmarshal(dis);
                                        dsm.TightUnmarshal(this, data, dis, bs);
                                        return data;
-                               } else {
+                               }
+                               else
+                               {
                                        dsm.LooseUnmarshal(this, data, dis);
                                        return data;
                                }
@@ -194,10 +215,12 @@
         public int TightMarshalNestedObject1(DataStructure o, BooleanStream bs)
         {
             bs.WriteBoolean(o != null);
-            if (o == null)
-                return 0;
+                       if(o == null)
+                       {
+                               return 0;
+                       }
             
-            if (o.IsMarshallAware())
+            if(o.IsMarshallAware())
             {
                 MarshallAware ma = (MarshallAware) o;
                 byte[] sequence = ma.GetMarshalledForm(this);
@@ -209,25 +232,31 @@
             }
             
             byte type = o.GetDataStructureType();
-            if (type == 0) {
+            if (type == 0)
+                       {
                 throw new IOException("No valid data structure type for: " + o 
+ " of type: " + o.GetType());
             }
+
             BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) 
dataMarshallers[type & 0xFF];
-            if (dsm == null)
-                throw new IOException("Unknown data type: " + type);
+                       if(dsm == null)
+                       {
+                               throw new IOException("Unknown data type: " + 
type);
+                       }
             Tracer.Debug("Marshalling type: " + type + " with structure: " + 
o);
             return 1 + dsm.TightMarshal1(this, o, bs);
         }
         
         public void TightMarshalNestedObject2(DataStructure o, BinaryWriter 
ds, BooleanStream bs)
         {
-            if (!bs.ReadBoolean())
-                return ;
+                       if(!bs.ReadBoolean())
+                       {
+                               return;
+                       }
             
             byte type = o.GetDataStructureType();
             ds.Write(type);
             
-            if (o.IsMarshallAware() && bs.ReadBoolean())
+            if(o.IsMarshallAware() && bs.ReadBoolean())
             {
                 MarshallAware ma = (MarshallAware) o;
                 byte[] sequence = ma.GetMarshalledForm(this);
@@ -237,24 +266,27 @@
             {
                 
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) 
dataMarshallers[type & 0xFF];
-                if (dsm == null)
-                    throw new IOException("Unknown data type: " + type);
+                               if(dsm == null)
+                               {
+                                       throw new IOException("Unknown data 
type: " + type);
+                               }
                 dsm.TightMarshal2(this, o, ds, bs);
             }
         }
         
         public DataStructure TightUnmarshalNestedObject(BinaryReader dis, 
BooleanStream bs)
         {
-            if (bs.ReadBoolean())
+            if(bs.ReadBoolean())
             {
-                
                 byte dataType = dis.ReadByte();
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) 
dataMarshallers[dataType & 0xFF];
-                if (dsm == null)
-                    throw new IOException("Unknown data type: " + dataType);
+                               if(dsm == null)
+                               {
+                                       throw new IOException("Unknown data 
type: " + dataType);
+                               }
                 DataStructure data = dsm.CreateObject();
                 
-                if (data.IsMarshallAware() && bs.ReadBoolean())
+                if(data.IsMarshallAware() && bs.ReadBoolean())
                 {
                     dis.ReadInt32();
                     dis.ReadByte();
@@ -285,25 +317,29 @@
         public void LooseMarshalNestedObject(DataStructure o, BinaryWriter 
dataOut)
         {
                        dataOut.Write(o!=null);
-                       if( o!=null ) {
+                       if(o != null)
+                       {
                                byte type = o.GetDataStructureType();
                                dataOut.Write(type);
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) 
dataMarshallers[type & 0xFF];
-                               if( dsm == null )
-                                       throw new IOException("Unknown data 
type: "+type);
+                               if(dsm == null)
+                               {
+                                       throw new IOException("Unknown data 
type: " + type);
+                               }
                                dsm.LooseMarshal(this, o, dataOut);
                        }
         }
         
         public DataStructure LooseUnmarshalNestedObject(BinaryReader dis)
         {
-            if (dis.ReadBoolean())
+            if(dis.ReadBoolean())
             {
-                
                 byte dataType = dis.ReadByte();
                 BaseDataStreamMarshaller dsm = (BaseDataStreamMarshaller) 
dataMarshallers[dataType & 0xFF];
-                if (dsm == null)
-                    throw new IOException("Unknown data type: " + dataType);
+                               if(dsm == null)
+                               {
+                                       throw new IOException("Unknown data 
type: " + dataType);
+                               }
                 DataStructure data = dsm.CreateObject();
                                dsm.LooseUnmarshal(this, data, dis);
                 return data;
@@ -316,18 +352,25 @@
 
         public void renegotiateWireFormat(WireFormatInfo info)
         {
-            if (info.Version < minimumVersion)
+            if(info.Version < minimumVersion)
             {
-                throw new IOException("Remote wire format (" + info.Version 
+") is lower the minimum version required (" + minimumVersion + ")");
+                throw new IOException("Remote wire format (" + info.Version 
+") is lower than the minimum version required (" + minimumVersion + ")");
             }
 
             this.Version = Math.Min( PreferedWireFormatInfo.Version, 
info.Version);
             this.stackTraceEnabled = info.StackTraceEnabled && 
PreferedWireFormatInfo.StackTraceEnabled;
-//            this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && 
PreferedWireFormatInfo.TcpNoDelayEnabled;
-//            this.cacheEnabled = info.CacheEnabled && 
PreferedWireFormatInfo.CacheEnabled;
-            this.tightEncodingEnabled = info.TightEncodingEnabled && 
PreferedWireFormatInfo.TightEncodingEnabled;
+                       this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && 
PreferedWireFormatInfo.TcpNoDelayEnabled;
+                       this.tightEncodingEnabled = info.TightEncodingEnabled 
&& PreferedWireFormatInfo.TightEncodingEnabled;
             this.sizePrefixDisabled = info.SizePrefixDisabled && 
PreferedWireFormatInfo.SizePrefixDisabled;
-            
-        }
+
+                       TcpTransport tcpTransport = this.transport as 
TcpTransport;
+                       if(null != tcpTransport)
+                       {
+                               tcpTransport.TcpNoDelayEnabled = 
this.tcpNoDelayEnabled;
+                       }
+
+                       // The following options is not used client-side.
+                       // this.cacheEnabled = info.CacheEnabled && 
PreferedWireFormatInfo.CacheEnabled;
+               }
     }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransport.cs
 Tue Sep  2 13:52:24 2008
@@ -43,9 +43,9 @@
                
                private CommandHandler commandHandler;
                private ExceptionHandler exceptionHandler;
-               private const int MAX_THREAD_WAIT = 30000;
+               private TimeSpan MAX_THREAD_WAIT = 
TimeSpan.FromMilliseconds(30000);
+
 
-               
                public TcpTransport(Socket socket, IWireFormat wireformat)
                {
                        this.socket = socket;
@@ -79,7 +79,7 @@
                                        // so lets use an instance for each of 
the 2 streams
                                        socketWriter = new 
OpenWireBinaryWriter(new NetworkStream(socket));
                                        socketReader = new 
OpenWireBinaryReader(new NetworkStream(socket));
-                                       
+
                                        // now lets create the background read 
thread
                                        readThread = new Thread(new 
ThreadStart(ReadLoop));
                                        readThread.IsBackground = true;
@@ -114,7 +114,7 @@
                                        }
 
                                        Wireformat.Marshal(command, 
socketWriter);
-                                       socketWriter.Flush();
+                                       //jdg socketWriter.Flush();
                                }
                                catch(Exception ex)
                                {
@@ -149,6 +149,17 @@
                        set { this.maxWait = value; }
                }
 
+               public bool TcpNoDelayEnabled
+               {
+#if !NETCF
+                       get { return this.socket.NoDelay; }
+                       set { this.socket.NoDelay = value; }
+#else
+                       get { return false; }
+                       set { }
+#endif
+               }
+
                public Response Request(Command command)
                {
                        throw new NotImplementedException("Use a 
ResponseCorrelator if you want to issue Request calls");
@@ -222,7 +233,18 @@
 #endif
                                                        )
                                                {
-                                                       
if(!readThread.Join(MAX_THREAD_WAIT))
+                                                       TimeSpan waitTime;
+
+                                                       if(maxWait < 
MAX_THREAD_WAIT)
+                                                       {
+                                                               waitTime = 
maxWait;
+                                                       }
+                                                       else
+                                                       {
+                                                               waitTime = 
MAX_THREAD_WAIT;
+                                                       }
+
+                                                       
if(!readThread.Join((int) waitTime.TotalMilliseconds))
                                                        {
                                                                
readThread.Abort();
                                                        }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs?rev=691378&r1=691377&r2=691378&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Transport/Tcp/TcpTransportFactory.cs
 Tue Sep  2 13:52:24 2008
@@ -42,6 +42,49 @@
                        set { useLogging = value; }
                }
 
+               /// <summary>
+               /// Size in bytes of the receive buffer.
+               /// </summary>
+               private int receiveBufferSize = 8192;
+               public int ReceiveBufferSize
+               {
+                       get { return receiveBufferSize; }
+                       set { receiveBufferSize = value; }
+               }
+
+               /// <summary>
+               /// Size in bytes of send buffer.
+               /// </summary>
+               private int sendBufferSize = 8192;
+               public int SendBufferSize
+               {
+                       get { return sendBufferSize; }
+                       set { sendBufferSize = value; }
+               }
+
+               /// <summary>
+               /// The time-out value, in milliseconds. The default value is 
0, which indicates
+               /// an infinite time-out period. Specifying -1 also indicates 
an infinite time-out period.
+               /// </summary>
+               private int receiveTimeout = 0;
+               public int ReceiveTimeout
+               {
+                       get { return receiveTimeout; }
+                       set { receiveTimeout = value; }
+               }
+
+               /// <summary>
+               /// The time-out value, in milliseconds. If you set the 
property with a value between 1 and 499,
+               /// the value will be changed to 500. The default value is 0, 
which indicates an infinite
+               /// time-out period. Specifying -1 also indicates an infinite 
time-out period.
+               /// </summary>
+               private int sendTimeout = 0;
+               public int SendTimeout
+               {
+                       get { return sendTimeout; }
+                       set { sendTimeout = value; }
+               }
+
                private string wireFormat = "OpenWire";
                public string WireFormat
                {
@@ -70,6 +113,14 @@
 
                        Tracer.Debug("Opening socket to: " + location.Host + " 
on port: " + location.Port);
                        Socket socket = Connect(location.Host, location.Port);
+
+#if !NETCF
+                       socket.ReceiveBufferSize = ReceiveBufferSize;
+                       socket.SendBufferSize = SendBufferSize;
+                       socket.ReceiveTimeout = ReceiveTimeout;
+                       socket.SendTimeout = SendTimeout;
+#endif
+
                        IWireFormat wireformat = CreateWireFormat(location, 
map);
                        ITransport transport = new TcpTransport(socket, 
wireformat);
 


Reply via email to