Author: jgomes
Date: Tue Oct 28 17:08:54 2008
New Revision: 708738
URL: http://svn.apache.org/viewvc?rev=708738&view=rev
Log:
Fix error introduced by turning on CacheEnabled wire format option by default.
Turned it off.
Made change to not start the transport connection until after processing URI
parameters first.
Added mutex for marshalling/unmarshalling to protect against renegotiating wire
format in the middle of marshalling/unmarshalling.
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs?rev=708738&r1=708737&r2=708738&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/Connection.cs
Tue Oct 28 17:08:54 2008
@@ -60,7 +60,6 @@
this.transport = transport;
this.transport.Command = new CommandHandler(OnCommand);
this.transport.Exception = new
ExceptionHandler(OnException);
- this.transport.Start();
}
~Connection()
Modified:
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs?rev=708738&r1=708737&r2=708738&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS.ActiveMQ/trunk/src/main/csharp/OpenWire/OpenWireFormat.cs
Tue Oct 28 17:08:54 2008
@@ -30,7 +30,7 @@
/// </summary>
public class OpenWireFormat : IWireFormat
{
-
+ private readonly object marshalLock = new object();
private BaseDataStreamMarshaller[] dataMarshallers;
private const byte NULL_TYPE = 0;
@@ -43,22 +43,23 @@
private long maxInactivityDuration = 0;
private long maxInactivityDurationInitialDelay = 0;
private int cacheSize = 0;
- private int minimumVersion=1;
+ private int minimumVersion = 1;
private WireFormatInfo preferedWireFormatInfo = new
WireFormatInfo();
private ITransport transport;
public OpenWireFormat()
{
- // See the following link for defaults:
http://activemq.apache.org/configuring-wire-formats.html
- PreferedWireFormatInfo.CacheEnabled = true;
+ // See the following link for defaults:
http://activemq.apache.org/configuring-wire-formats.html
+ // See also the following link for OpenWire format
info: http://activemq.apache.org/openwire-version-2-specification.html
+ PreferedWireFormatInfo.CacheEnabled = false;
PreferedWireFormatInfo.StackTraceEnabled = true;
- PreferedWireFormatInfo.TcpNoDelayEnabled = true; //
Deviate from defaults for platform speed increase.
+ PreferedWireFormatInfo.TcpNoDelayEnabled = true;
PreferedWireFormatInfo.SizePrefixDisabled = false;
- PreferedWireFormatInfo.TightEncodingEnabled = true;
+ PreferedWireFormatInfo.TightEncodingEnabled = false;
PreferedWireFormatInfo.MaxInactivityDuration = 30000;
PreferedWireFormatInfo.MaxInactivityDurationInitialDelay = 0;
- PreferedWireFormatInfo.CacheSize = 1024;
+ PreferedWireFormatInfo.CacheSize = 0;
PreferedWireFormatInfo.Version = 2;
dataMarshallers = new BaseDataStreamMarshaller[256];
@@ -77,7 +78,7 @@
set
{
Assembly dll = Assembly.GetExecutingAssembly();
- Type type =
dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V"+value+".MarshallerFactory", false);
+ Type type =
dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V" + value + ".MarshallerFactory",
false);
IMarshallerFactory factory =
(IMarshallerFactory) Activator.CreateInstance(type);
factory.configure(this);
version = value;
@@ -140,16 +141,22 @@
public void clearMarshallers()
{
- for (int i=0; i < dataMarshallers.Length; i++ )
+ lock(this.marshalLock)
{
- dataMarshallers[i] = null;
+ for(int i = 0; i < dataMarshallers.Length; i++)
+ {
+ dataMarshallers[i] = null;
+ }
}
}
public void addMarshaller(BaseDataStreamMarshaller marshaller)
{
byte type = marshaller.GetDataStructureType();
- dataMarshallers[type & 0xFF] = marshaller;
+ lock(this.marshalLock)
+ {
+ dataMarshallers[type & 0xFF] = marshaller;
+ }
}
public void Marshal(Object o, BinaryWriter ds)
@@ -157,50 +164,54 @@
int size = 1;
if(o != null)
{
- DataStructure c = (DataStructure) o;
- byte type = c.GetDataStructureType();
- BaseDataStreamMarshaller dsm =
dataMarshallers[type & 0xFF];
- if(dsm == null)
+ lock(this.marshalLock)
{
- throw new IOException("Unknown data
type: " + type);
- }
-
- if(tightEncodingEnabled)
- {
- BooleanStream bs = new BooleanStream();
- size += dsm.TightMarshal1(this, c, bs);
- size += bs.MarshalledSize();
-
- if(!sizePrefixDisabled)
+ DataStructure c = (DataStructure) o;
+ byte type = c.GetDataStructureType();
+ BaseDataStreamMarshaller dsm =
dataMarshallers[type & 0xFF];
+ if(null == dsm)
{
- ds.Write(size);
+ throw new IOException("Unknown
data type: " + type);
}
- ds.Write(type);
- bs.Marshal(ds);
- dsm.TightMarshal2(this, c, ds, bs);
- }
- else
- {
- BinaryWriter looseOut = ds;
- MemoryStream ms = null;
- // If we are prefixing then we need to
first write it to memory,
- // otherwise we can write direct to the
stream.
- if(!sizePrefixDisabled)
+ if(tightEncodingEnabled)
{
- ms = new MemoryStream();
- looseOut = new
OpenWireBinaryWriter(ms);
- looseOut.Write(size);
+ BooleanStream bs = new
BooleanStream();
+ size += dsm.TightMarshal1(this,
c, bs);
+ size += bs.MarshalledSize();
+
+ if(!sizePrefixDisabled)
+ {
+ ds.Write(size);
+ }
+
+ ds.Write(type);
+ bs.Marshal(ds);
+ dsm.TightMarshal2(this, c, ds,
bs);
}
-
- looseOut.Write(type);
- dsm.LooseMarshal(this, c, looseOut);
-
- if(!sizePrefixDisabled)
+ else
{
- ms.Position = 0;
- looseOut.Write((int) ms.Length
- 4);
- ds.Write(ms.GetBuffer(), 0,
(int) ms.Length);
+ BinaryWriter looseOut = ds;
+ MemoryStream ms = null;
+
+ // If we are prefixing then we
need to first write it to memory,
+ // otherwise we can write
direct to the stream.
+ if(!sizePrefixDisabled)
+ {
+ ms = new MemoryStream();
+ looseOut = new
OpenWireBinaryWriter(ms);
+ looseOut.Write(size);
+ }
+
+ looseOut.Write(type);
+ dsm.LooseMarshal(this, c,
looseOut);
+
+ if(!sizePrefixDisabled)
+ {
+ ms.Position = 0;
+ looseOut.Write((int)
ms.Length - 4);
+
ds.Write(ms.GetBuffer(), 0, (int) ms.Length);
+ }
}
}
}
@@ -213,48 +224,51 @@
public Object Unmarshal(BinaryReader dis)
{
- // lets ignore the size of the packet
- if(!sizePrefixDisabled)
+ lock(this.marshalLock)
{
- dis.ReadInt32();
- }
-
- // first byte is the type of the packet
- byte dataType = dis.ReadByte();
- if(dataType != NULL_TYPE)
- {
- BaseDataStreamMarshaller dsm =
dataMarshallers[dataType & 0xFF];
- if(dsm == null)
+ // lets ignore the size of the packet
+ if(!sizePrefixDisabled)
{
- throw new IOException("Unknown data
type: " + dataType);
+ dis.ReadInt32();
}
- Tracer.Debug("Parsing type: " + dataType + "
with: " + dsm);
- Object data = dsm.CreateObject();
-
- if(tightEncodingEnabled)
+ // first byte is the type of the packet
+ byte dataType = dis.ReadByte();
+ if(dataType != NULL_TYPE)
{
- BooleanStream bs = new BooleanStream();
- bs.Unmarshal(dis);
- dsm.TightUnmarshal(this, data, dis, bs);
- return data;
+ BaseDataStreamMarshaller dsm =
dataMarshallers[dataType & 0xFF];
+ if(null == dsm)
+ {
+ throw new IOException("Unknown
data type: " + dataType);
+ }
+
+ Tracer.Debug("Parsing type: " +
dataType + " with: " + dsm);
+ Object data = dsm.CreateObject();
+
+ if(tightEncodingEnabled)
+ {
+ BooleanStream bs = new
BooleanStream();
+ bs.Unmarshal(dis);
+ dsm.TightUnmarshal(this, data,
dis, bs);
+ return data;
+ }
+ else
+ {
+ dsm.LooseUnmarshal(this, data,
dis);
+ return data;
+ }
}
else
{
- dsm.LooseUnmarshal(this, data, dis);
- return data;
+ return null;
}
}
- else
- {
- return null;
- }
}
public int TightMarshalNestedObject1(DataStructure o,
BooleanStream bs)
{
bs.WriteBoolean(o != null);
- if(o == null)
+ if(null == o)
{
return 0;
}
@@ -264,25 +278,30 @@
MarshallAware ma = (MarshallAware) o;
byte[] sequence = ma.GetMarshalledForm(this);
bs.WriteBoolean(sequence != null);
- if (sequence != null)
+ if(sequence != null)
{
return 1 + sequence.Length;
}
}
byte type = o.GetDataStructureType();
- if (type == 0)
+ if(type == 0)
{
throw new IOException("No valid data structure
type for: " + o + " of type: " + o.GetType());
}
- BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
- if(dsm == null)
+ lock(this.marshalLock)
{
- throw new IOException("Unknown data type: " +
type);
+ BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
+
+ if(null == dsm)
+ {
+ throw new IOException("Unknown data
type: " + type);
+ }
+
+ Tracer.Debug("Marshalling type: " + type + "
with structure: " + o);
+ return 1 + dsm.TightMarshal1(this, o, bs);
}
- Tracer.Debug("Marshalling type: " + type + " with
structure: " + o);
- return 1 + dsm.TightMarshal1(this, o, bs);
}
public void TightMarshalNestedObject2(DataStructure o,
BinaryWriter ds, BooleanStream bs)
@@ -303,13 +322,17 @@
}
else
{
-
- BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
- if(dsm == null)
+ lock(this.marshalLock)
{
- throw new IOException("Unknown data
type: " + type);
+ BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
+
+ if(null == dsm)
+ {
+ throw new IOException("Unknown
data type: " + type);
+ }
+
+ dsm.TightMarshal2(this, o, ds, bs);
}
- dsm.TightMarshal2(this, o, ds, bs);
}
}
@@ -317,30 +340,37 @@
{
if(bs.ReadBoolean())
{
- byte dataType = dis.ReadByte();
- BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
- if(dsm == null)
- {
- throw new IOException("Unknown data
type: " + dataType);
- }
- DataStructure data = dsm.CreateObject();
+ DataStructure data;
- if(data.IsMarshallAware() && bs.ReadBoolean())
+ lock(this.marshalLock)
{
- dis.ReadInt32();
- dis.ReadByte();
+ byte dataType = dis.ReadByte();
+ BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
- BooleanStream bs2 = new BooleanStream();
- bs2.Unmarshal(dis);
- dsm.TightUnmarshal(this, data, dis,
bs2);
-
- // TODO: extract the sequence from the
dis and associate it.
- // MarshallAware ma =
(MarshallAware)data
- //
ma.setCachedMarshalledForm(this, sequence);
- }
- else
- {
- dsm.TightUnmarshal(this, data, dis, bs);
+ if(null == dsm)
+ {
+ throw new IOException("Unknown
data type: " + dataType);
+ }
+
+ data = dsm.CreateObject();
+
+ if(data.IsMarshallAware() &&
bs.ReadBoolean())
+ {
+ dis.ReadInt32();
+ dis.ReadByte();
+
+ BooleanStream bs2 = new
BooleanStream();
+ bs2.Unmarshal(dis);
+ dsm.TightUnmarshal(this, data,
dis, bs2);
+
+ // TODO: extract the sequence
from the dis and associate it.
+ // MarshallAware
ma = (MarshallAware)data
+ //
ma.setCachedMarshalledForm(this, sequence);
+ }
+ else
+ {
+ dsm.TightUnmarshal(this, data,
dis, bs);
+ }
}
return data;
@@ -351,21 +381,25 @@
}
}
-
-
public void LooseMarshalNestedObject(DataStructure o,
BinaryWriter dataOut)
{
- dataOut.Write(o!=null);
+ dataOut.Write(o != null);
if(o != null)
{
byte type = o.GetDataStructureType();
dataOut.Write(type);
- BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
- if(dsm == null)
+
+ lock(this.marshalLock)
{
- throw new IOException("Unknown data
type: " + type);
+ BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[type & 0xFF];
+
+ if(null == dsm)
+ {
+ throw new IOException("Unknown
data type: " + type);
+ }
+
+ dsm.LooseMarshal(this, o, dataOut);
}
- dsm.LooseMarshal(this, o, dataOut);
}
}
@@ -374,13 +408,21 @@
if(dis.ReadBoolean())
{
byte dataType = dis.ReadByte();
- BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
- if(dsm == null)
+ DataStructure data;
+
+ lock(this.marshalLock)
{
- throw new IOException("Unknown data
type: " + dataType);
+ BaseDataStreamMarshaller dsm =
(BaseDataStreamMarshaller) dataMarshallers[dataType & 0xFF];
+
+ if(null == dsm)
+ {
+ throw new IOException("Unknown
data type: " + dataType);
+ }
+
+ data = dsm.CreateObject();
+ dsm.LooseUnmarshal(this, data, dis);
}
- DataStructure data = dsm.CreateObject();
- dsm.LooseUnmarshal(this, data, dis);
+
return data;
}
else
@@ -391,25 +433,28 @@
public void renegotiateWireFormat(WireFormatInfo info)
{
- if(info.Version < minimumVersion)
+ lock(this.marshalLock)
{
- throw new IOException("Remote wire format (" +
info.Version +") is lower than the minimum version required (" + minimumVersion
+ ")");
- }
+ if(info.Version < minimumVersion)
+ {
+ throw new IOException("Remote wire
format (" + info.Version + ") is lower than the minimum version required (" +
minimumVersion + ")");
+ }
- this.Version = Math.Min(PreferedWireFormatInfo.Version,
info.Version);
- this.cacheEnabled = info.CacheEnabled &&
PreferedWireFormatInfo.CacheEnabled;
- this.stackTraceEnabled = info.StackTraceEnabled &&
PreferedWireFormatInfo.StackTraceEnabled;
- this.tcpNoDelayEnabled = info.TcpNoDelayEnabled &&
PreferedWireFormatInfo.TcpNoDelayEnabled;
- this.sizePrefixDisabled = info.SizePrefixDisabled &&
PreferedWireFormatInfo.SizePrefixDisabled;
- this.tightEncodingEnabled = info.TightEncodingEnabled
&& PreferedWireFormatInfo.TightEncodingEnabled;
- this.maxInactivityDuration = info.MaxInactivityDuration;
- this.maxInactivityDurationInitialDelay =
info.MaxInactivityDurationInitialDelay;
- this.cacheSize = info.CacheSize;
+ this.Version =
Math.Min(PreferedWireFormatInfo.Version, info.Version);
+ this.cacheEnabled = info.CacheEnabled &&
PreferedWireFormatInfo.CacheEnabled;
+ this.stackTraceEnabled = info.StackTraceEnabled
&& PreferedWireFormatInfo.StackTraceEnabled;
+ this.tcpNoDelayEnabled = info.TcpNoDelayEnabled
&& PreferedWireFormatInfo.TcpNoDelayEnabled;
+ this.sizePrefixDisabled =
info.SizePrefixDisabled && PreferedWireFormatInfo.SizePrefixDisabled;
+ this.tightEncodingEnabled =
info.TightEncodingEnabled && PreferedWireFormatInfo.TightEncodingEnabled;
+ this.maxInactivityDuration =
info.MaxInactivityDuration;
+ this.maxInactivityDurationInitialDelay =
info.MaxInactivityDurationInitialDelay;
+ this.cacheSize = info.CacheSize;
- TcpTransport tcpTransport = this.transport as
TcpTransport;
- if(null != tcpTransport)
- {
- tcpTransport.TcpNoDelayEnabled =
this.tcpNoDelayEnabled;
+ TcpTransport tcpTransport = this.transport as
TcpTransport;
+ if(null != tcpTransport)
+ {
+ tcpTransport.TcpNoDelayEnabled =
this.tcpNoDelayEnabled;
+ }
}
}
}