Modified: qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
(original)
+++ qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs Thu Dec
3 23:55:48 2009
@@ -1,98 +1,98 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using System.Threading;
-using Apache.Qpid.Collections;
-using Apache.Qpid.Common;
-
-namespace Apache.Qpid.Client.Util
-{
- internal delegate void ThresholdMethod(int currentCount);
-
- /// <summary>
- /// Basic bounded queue used to implement prefetching.
- /// Notice we do the callbacks here asynchronously to
- /// avoid adding more complexity to the channel impl.
- /// </summary>
- internal class FlowControlQueue
- {
- private BlockingQueue _queue = new LinkedBlockingQueue();
- private int _itemCount;
- private int _lowerBound;
- private int _upperBound;
- private ThresholdMethod _underThreshold;
- private ThresholdMethod _overThreshold;
-
- public FlowControlQueue(
- int lowerBound,
- int upperBound,
- ThresholdMethod underThreshold,
- ThresholdMethod overThreshold
- )
- {
- _lowerBound = lowerBound;
- _upperBound = upperBound;
- _underThreshold = underThreshold;
- _overThreshold = overThreshold;
- }
-
- public void Enqueue(object item)
- {
- _queue.EnqueueBlocking(item);
- int count = Interlocked.Increment(ref _itemCount);
- if ( _overThreshold != null )
- {
- if ( count == _upperBound )
- {
- _overThreshold.BeginInvoke(
- count, new AsyncCallback(OnAsyncCallEnd),
- _overThreshold
- );
- }
- }
- }
-
- public object Dequeue()
- {
- object item = _queue.DequeueBlocking();
- int count = Interlocked.Decrement(ref _itemCount);
- if ( _underThreshold != null )
- {
- if ( count == _lowerBound )
- {
- _underThreshold.BeginInvoke(
- count, new AsyncCallback(OnAsyncCallEnd),
- _underThreshold
- );
- }
- }
- return item;
- }
-
- private void OnAsyncCallEnd(IAsyncResult res)
- {
- ThresholdMethod method = (ThresholdMethod)res.AsyncState;
- method.EndInvoke(res);
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using Apache.Qpid.Collections;
+using Apache.Qpid.Common;
+
+namespace Apache.Qpid.Client.Util
+{
+ internal delegate void ThresholdMethod(int currentCount);
+
+ /// <summary>
+ /// Basic bounded queue used to implement prefetching.
+ /// Notice we do the callbacks here asynchronously to
+ /// avoid adding more complexity to the channel impl.
+ /// </summary>
+ internal class FlowControlQueue
+ {
+ private BlockingQueue _queue = new LinkedBlockingQueue();
+ private int _itemCount;
+ private int _lowerBound;
+ private int _upperBound;
+ private ThresholdMethod _underThreshold;
+ private ThresholdMethod _overThreshold;
+
+ public FlowControlQueue(
+ int lowerBound,
+ int upperBound,
+ ThresholdMethod underThreshold,
+ ThresholdMethod overThreshold
+ )
+ {
+ _lowerBound = lowerBound;
+ _upperBound = upperBound;
+ _underThreshold = underThreshold;
+ _overThreshold = overThreshold;
+ }
+
+ public void Enqueue(object item)
+ {
+ _queue.EnqueueBlocking(item);
+ int count = Interlocked.Increment(ref _itemCount);
+ if ( _overThreshold != null )
+ {
+ if ( count == _upperBound )
+ {
+ _overThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _overThreshold
+ );
+ }
+ }
+ }
+
+ public object Dequeue()
+ {
+ object item = _queue.DequeueBlocking();
+ int count = Interlocked.Decrement(ref _itemCount);
+ if ( _underThreshold != null )
+ {
+ if ( count == _lowerBound )
+ {
+ _underThreshold.BeginInvoke(
+ count, new AsyncCallback(OnAsyncCallEnd),
+ _underThreshold
+ );
+ }
+ }
+ return item;
+ }
+
+ private void OnAsyncCallEnd(IAsyncResult res)
+ {
+ ThresholdMethod method = (ThresholdMethod)res.AsyncState;
+ method.EndInvoke(res);
+ }
+ }
+}
Propchange: qpid/trunk/qpid/dotnet/Qpid.Client/Client/Util/FlowControlQueue.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
---
qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
(original)
+++
qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
Thu Dec 3 23:55:48 2009
@@ -1,85 +1,85 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Text;
-using System.Threading;
-using NUnit.Framework;
-using Apache.Qpid.Collections;
-
-namespace Apache.Qpid.Collections.Tests
-{
- [TestFixture]
- public class TestConsumerProducerQueue
- {
- private ConsumerProducerQueue _queue;
-
- [SetUp]
- public void SetUp()
- {
- _queue = new ConsumerProducerQueue();
- }
-
- [Test]
- public void CanDequeueWithInifiniteWait()
- {
- Thread producer = new Thread(new ThreadStart(ProduceFive));
- producer.Start();
- for ( int i = 0; i < 5; i++ )
- {
- object item = _queue.Dequeue();
- Assert.IsNotNull(item);
- }
- }
-
- [Test]
- public void ReturnsNullOnDequeueTimeout()
- {
- // queue is empty
- Assert.IsNull(_queue.Dequeue(500));
- }
-
- [Test]
- public void DequeueTillEmpty()
- {
- _queue.Enqueue(1);
- _queue.Enqueue(2);
- _queue.Enqueue(3);
- Assert.AreEqual(1, _queue.Dequeue());
- Assert.AreEqual(2, _queue.Dequeue());
- Assert.AreEqual(3, _queue.Dequeue());
- // no messages in queue, will timeout
- Assert.IsNull(_queue.Dequeue(500));
- }
-
-
- private void ProduceFive()
- {
- Thread.Sleep(1000);
- _queue.Enqueue("test item 1");
- _queue.Enqueue("test item 2");
- _queue.Enqueue("test item 3");
- Thread.Sleep(0);
- _queue.Enqueue("test item 4");
- _queue.Enqueue("test item 5");
- }
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Text;
+using System.Threading;
+using NUnit.Framework;
+using Apache.Qpid.Collections;
+
+namespace Apache.Qpid.Collections.Tests
+{
+ [TestFixture]
+ public class TestConsumerProducerQueue
+ {
+ private ConsumerProducerQueue _queue;
+
+ [SetUp]
+ public void SetUp()
+ {
+ _queue = new ConsumerProducerQueue();
+ }
+
+ [Test]
+ public void CanDequeueWithInifiniteWait()
+ {
+ Thread producer = new Thread(new ThreadStart(ProduceFive));
+ producer.Start();
+ for ( int i = 0; i < 5; i++ )
+ {
+ object item = _queue.Dequeue();
+ Assert.IsNotNull(item);
+ }
+ }
+
+ [Test]
+ public void ReturnsNullOnDequeueTimeout()
+ {
+ // queue is empty
+ Assert.IsNull(_queue.Dequeue(500));
+ }
+
+ [Test]
+ public void DequeueTillEmpty()
+ {
+ _queue.Enqueue(1);
+ _queue.Enqueue(2);
+ _queue.Enqueue(3);
+ Assert.AreEqual(1, _queue.Dequeue());
+ Assert.AreEqual(2, _queue.Dequeue());
+ Assert.AreEqual(3, _queue.Dequeue());
+ // no messages in queue, will timeout
+ Assert.IsNull(_queue.Dequeue(500));
+ }
+
+
+ private void ProduceFive()
+ {
+ Thread.Sleep(1000);
+ _queue.Enqueue("test item 1");
+ _queue.Enqueue("test item 2");
+ _queue.Enqueue("test item 3");
+ Thread.Sleep(0);
+ _queue.Enqueue("test item 4");
+ _queue.Enqueue("test item 5");
+ }
+ }
+}
Propchange:
qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Collections/TestConsumerProducerQueue.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs
(original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs Thu
Dec 3 23:55:48 2009
@@ -1,270 +1,270 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using NUnit.Framework;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Framing.Tests
-{
- [TestFixture]
- public class TestAMQType
- {
-
- #region LONG_STRING tests
- [Test]
- public void LONG_STRING_ReadWrite()
- {
- AMQType type = AMQType.LONG_STRING;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const string VALUE = "simple string 1";
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- #endregion // LONG_STRING tests
-
- #region UINT32 tests
- [Test]
- public void UINT32_CanGetEncodingSize()
- {
- AMQType type = AMQType.UINT32;
- Assert.AreEqual(4, type.GetEncodingSize(1234443));
- }
-
- [Test]
- public void UINT32_ToNativeValue()
- {
- AMQType type = AMQType.UINT32;
- Assert.AreEqual(1, type.ToNativeValue(1));
- Assert.AreEqual(1, type.ToNativeValue((short)1));
- Assert.AreEqual(1, type.ToNativeValue((byte)1));
- Assert.AreEqual(1, type.ToNativeValue("1"));
-
- try
- {
- Assert.AreEqual(1, type.ToNativeValue("adasdads"));
- Assert.Fail("Invalid format allowed");
- } catch ( FormatException )
- {
- }
- }
-
- [Test]
- public void UINT32_ReadWrite()
- {
- AMQType type = AMQType.UINT32;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const uint VALUE = 0xFFEEDDCC;
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- #endregion // UINT32 Tests
-
- #region VOID Tests
- [Test]
- public void VOID_CanGetEncodingSize()
- {
- AMQType type = AMQType.VOID;
- Assert.AreEqual(0, type.GetEncodingSize(null));
- }
-
- [Test]
- public void VOID_ToNativeValue()
- {
- AMQType type = AMQType.VOID;
- Assert.IsNull(type.ToNativeValue(null));
-
- try
- {
- type.ToNativeValue("asdasd");
- Assert.Fail("converted invalid value");
- } catch (FormatException)
- {
- }
- }
-
- [Test]
- public void VOID_ReadWrite()
- {
- AMQType type = AMQType.VOID;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-
- type.WriteToBuffer(null, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(null, value.Value);
- }
-
- #endregion // VOID Tests
-
- #region BOOLEAN Tests
- [Test]
- public void BOOLEAN_CanGetEncodingSize()
- {
- AMQType type = AMQType.BOOLEAN;
- Assert.AreEqual(1, type.GetEncodingSize(true));
- }
-
- [Test]
- public void BOOLEAN_ToNativeValue()
- {
- AMQType type = AMQType.BOOLEAN;
- Assert.AreEqual(true, type.ToNativeValue(true));
- Assert.AreEqual(false, type.ToNativeValue("false"));
-
- try
- {
- type.ToNativeValue("asdasd");
- Assert.Fail("converted invalid value");
- } catch ( FormatException )
- {
- }
- }
-
- [Test]
- public void BOOLEAN_ReadWrite()
- {
- AMQType type = AMQType.BOOLEAN;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
-
- type.WriteToBuffer(true, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(true, value.Value);
- }
- #endregion // BOOLEAN Tests
-
- #region INT16 tests
- [Test]
- public void INT16_ReadWrite()
- {
- AMQType type = AMQType.INT16;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const short VALUE = -32765;
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- //public void UINT16_ReadWrite()
- //{
- // AMQType type = AMQType.UINT16;
- // ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- // const ushort VALUE = 64321;
-
- // type.WriteToBuffer(VALUE, buffer);
- // buffer.Flip();
- // buffer.Rewind();
- // AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- // Assert.AreEqual(VALUE, value.Value);
- //}
- #endregion // INT16 Tests
-
- #region INT32 tests
- [Test]
- public void INT32_ReadWrite()
- {
- AMQType type = AMQType.INT32;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const int VALUE = -39273563;
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- #endregion // INT32 Tests
-
- #region INT64 tests
- [Test]
- public void INT64_ReadWrite()
- {
- AMQType type = AMQType.INT64;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const long VALUE = -(2^43+1233123);
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- [Test]
- public void UINT64_ReadWrite()
- {
- AMQType type = AMQType.UINT64;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const ulong VALUE = (2 ^ 61 + 1233123);
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- #endregion // INT64 Tests
-
- #region FLOAT tests
- [Test]
- public void FLOAT_ReadWrite()
- {
- AMQType type = AMQType.FLOAT;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const float VALUE = 1.2345000E-035f;
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- #endregion // FLOAT Tests
-
- #region DOUBLE tests
- [Test]
- public void DOUBLE_ReadWrite()
- {
- AMQType type = AMQType.DOUBLE;
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- const double VALUE = 1.2345000E-045;
-
- type.WriteToBuffer(VALUE, buffer);
- buffer.Flip();
- buffer.Rewind();
- AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
- Assert.AreEqual(VALUE, value.Value);
- }
- #endregion // FLOAT Tests
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using NUnit.Framework;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Framing.Tests
+{
+ [TestFixture]
+ public class TestAMQType
+ {
+
+ #region LONG_STRING tests
+ [Test]
+ public void LONG_STRING_ReadWrite()
+ {
+ AMQType type = AMQType.LONG_STRING;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const string VALUE = "simple string 1";
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ #endregion // LONG_STRING tests
+
+ #region UINT32 tests
+ [Test]
+ public void UINT32_CanGetEncodingSize()
+ {
+ AMQType type = AMQType.UINT32;
+ Assert.AreEqual(4, type.GetEncodingSize(1234443));
+ }
+
+ [Test]
+ public void UINT32_ToNativeValue()
+ {
+ AMQType type = AMQType.UINT32;
+ Assert.AreEqual(1, type.ToNativeValue(1));
+ Assert.AreEqual(1, type.ToNativeValue((short)1));
+ Assert.AreEqual(1, type.ToNativeValue((byte)1));
+ Assert.AreEqual(1, type.ToNativeValue("1"));
+
+ try
+ {
+ Assert.AreEqual(1, type.ToNativeValue("adasdads"));
+ Assert.Fail("Invalid format allowed");
+ } catch ( FormatException )
+ {
+ }
+ }
+
+ [Test]
+ public void UINT32_ReadWrite()
+ {
+ AMQType type = AMQType.UINT32;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const uint VALUE = 0xFFEEDDCC;
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ #endregion // UINT32 Tests
+
+ #region VOID Tests
+ [Test]
+ public void VOID_CanGetEncodingSize()
+ {
+ AMQType type = AMQType.VOID;
+ Assert.AreEqual(0, type.GetEncodingSize(null));
+ }
+
+ [Test]
+ public void VOID_ToNativeValue()
+ {
+ AMQType type = AMQType.VOID;
+ Assert.IsNull(type.ToNativeValue(null));
+
+ try
+ {
+ type.ToNativeValue("asdasd");
+ Assert.Fail("converted invalid value");
+ } catch (FormatException)
+ {
+ }
+ }
+
+ [Test]
+ public void VOID_ReadWrite()
+ {
+ AMQType type = AMQType.VOID;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+
+ type.WriteToBuffer(null, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(null, value.Value);
+ }
+
+ #endregion // VOID Tests
+
+ #region BOOLEAN Tests
+ [Test]
+ public void BOOLEAN_CanGetEncodingSize()
+ {
+ AMQType type = AMQType.BOOLEAN;
+ Assert.AreEqual(1, type.GetEncodingSize(true));
+ }
+
+ [Test]
+ public void BOOLEAN_ToNativeValue()
+ {
+ AMQType type = AMQType.BOOLEAN;
+ Assert.AreEqual(true, type.ToNativeValue(true));
+ Assert.AreEqual(false, type.ToNativeValue("false"));
+
+ try
+ {
+ type.ToNativeValue("asdasd");
+ Assert.Fail("converted invalid value");
+ } catch ( FormatException )
+ {
+ }
+ }
+
+ [Test]
+ public void BOOLEAN_ReadWrite()
+ {
+ AMQType type = AMQType.BOOLEAN;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+
+ type.WriteToBuffer(true, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(true, value.Value);
+ }
+ #endregion // BOOLEAN Tests
+
+ #region INT16 tests
+ [Test]
+ public void INT16_ReadWrite()
+ {
+ AMQType type = AMQType.INT16;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const short VALUE = -32765;
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ //public void UINT16_ReadWrite()
+ //{
+ // AMQType type = AMQType.UINT16;
+ // ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ // const ushort VALUE = 64321;
+
+ // type.WriteToBuffer(VALUE, buffer);
+ // buffer.Flip();
+ // buffer.Rewind();
+ // AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ // Assert.AreEqual(VALUE, value.Value);
+ //}
+ #endregion // INT16 Tests
+
+ #region INT32 tests
+ [Test]
+ public void INT32_ReadWrite()
+ {
+ AMQType type = AMQType.INT32;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const int VALUE = -39273563;
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ #endregion // INT32 Tests
+
+ #region INT64 tests
+ [Test]
+ public void INT64_ReadWrite()
+ {
+ AMQType type = AMQType.INT64;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const long VALUE = -(2^43+1233123);
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ [Test]
+ public void UINT64_ReadWrite()
+ {
+ AMQType type = AMQType.UINT64;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const ulong VALUE = (2 ^ 61 + 1233123);
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ #endregion // INT64 Tests
+
+ #region FLOAT tests
+ [Test]
+ public void FLOAT_ReadWrite()
+ {
+ AMQType type = AMQType.FLOAT;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const float VALUE = 1.2345000E-035f;
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ #endregion // FLOAT Tests
+
+ #region DOUBLE tests
+ [Test]
+ public void DOUBLE_ReadWrite()
+ {
+ AMQType type = AMQType.DOUBLE;
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ const double VALUE = 1.2345000E-045;
+
+ type.WriteToBuffer(VALUE, buffer);
+ buffer.Flip();
+ buffer.Rewind();
+ AMQTypedValue value = AMQTypedValue.ReadFromBuffer(buffer);
+ Assert.AreEqual(VALUE, value.Value);
+ }
+ #endregion // FLOAT Tests
+ }
+}
Propchange: qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestAMQType.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs
(original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs
Thu Dec 3 23:55:48 2009
@@ -1,60 +1,60 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using NUnit.Framework;
-using Apache.Qpid.Buffer;
-using Apache.Qpid.Framing;
-
-namespace Apache.Qpid.Framing.Tests
-{
- [TestFixture]
- public class TestEncodingUtils
- {
- [Test]
- public void CanReadLongAsShortString()
- {
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- EncodingUtils.WriteShortStringBytes(buffer, "98878122");
- buffer.Flip();
- long value = EncodingUtils.ReadLongAsShortString(buffer);
- Assert.AreEqual(98878122, value);
- }
- [Test]
- public void CanReadLongAsShortStringNegative()
- {
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- EncodingUtils.WriteShortStringBytes(buffer, "-98878122");
- buffer.Flip();
- long value = EncodingUtils.ReadLongAsShortString(buffer);
- Assert.AreEqual(-98878122, value);
- }
- [Test]
- public void CanReadLongAsShortStringEmpty()
- {
- ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
- EncodingUtils.WriteShortStringBytes(buffer, "");
- buffer.Flip();
- long value = EncodingUtils.ReadLongAsShortString(buffer);
- Assert.AreEqual(0, value);
- }
-
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using NUnit.Framework;
+using Apache.Qpid.Buffer;
+using Apache.Qpid.Framing;
+
+namespace Apache.Qpid.Framing.Tests
+{
+ [TestFixture]
+ public class TestEncodingUtils
+ {
+ [Test]
+ public void CanReadLongAsShortString()
+ {
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ EncodingUtils.WriteShortStringBytes(buffer, "98878122");
+ buffer.Flip();
+ long value = EncodingUtils.ReadLongAsShortString(buffer);
+ Assert.AreEqual(98878122, value);
+ }
+ [Test]
+ public void CanReadLongAsShortStringNegative()
+ {
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ EncodingUtils.WriteShortStringBytes(buffer, "-98878122");
+ buffer.Flip();
+ long value = EncodingUtils.ReadLongAsShortString(buffer);
+ Assert.AreEqual(-98878122, value);
+ }
+ [Test]
+ public void CanReadLongAsShortStringEmpty()
+ {
+ ByteBuffer buffer = ByteBuffer.Allocate(0x1000);
+ EncodingUtils.WriteShortStringBytes(buffer, "");
+ buffer.Flip();
+ long value = EncodingUtils.ReadLongAsShortString(buffer);
+ Assert.AreEqual(0, value);
+ }
+
+ }
+}
Propchange:
qpid/trunk/qpid/dotnet/Qpid.Common.Tests/Qpid/Framing/TestEncodingUtils.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs (original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs Thu Dec
3 23:55:48 2009
@@ -1,46 +1,46 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Runtime.Serialization;
-
-using Apache.Qpid.Protocol;
-
-namespace Apache.Qpid
-{
- /// <summary>
- /// Thrown when an invalid argument was supplied to the broker
- /// </summary>
- [Serializable]
- public class AMQInvalidArgumentException : AMQException
- {
- public AMQInvalidArgumentException(string message)
- : base(AMQConstant.INVALID_ARGUMENT.Code, message, null)
- {
- }
-
- protected AMQInvalidArgumentException(SerializationInfo info,
StreamingContext ctxt)
- : base(info, ctxt)
- {
- }
-
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Runtime.Serialization;
+
+using Apache.Qpid.Protocol;
+
+namespace Apache.Qpid
+{
+ /// <summary>
+ /// Thrown when an invalid argument was supplied to the broker
+ /// </summary>
+ [Serializable]
+ public class AMQInvalidArgumentException : AMQException
+ {
+ public AMQInvalidArgumentException(string message)
+ : base(AMQConstant.INVALID_ARGUMENT.Code, message, null)
+ {
+ }
+
+ protected AMQInvalidArgumentException(SerializationInfo info,
StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+
+ }
+}
Propchange: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidArgumentException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs
(original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs Thu Dec
3 23:55:48 2009
@@ -1,46 +1,46 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-using System;
-using System.Runtime.Serialization;
-
-using Apache.Qpid.Protocol;
-
-namespace Apache.Qpid
-{
- /// <summary>
- /// Thrown when an invalid routing key was sent to the broker
- /// </summary>
- [Serializable]
- public class AMQInvalidRoutingKeyException : AMQException
- {
- public AMQInvalidRoutingKeyException(string message)
- : base(AMQConstant.INVALID_ROUTING_KEY.Code, message, null)
- {
- }
-
- protected AMQInvalidRoutingKeyException(SerializationInfo info,
StreamingContext ctxt)
- : base(info, ctxt)
- {
- }
-
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+using System;
+using System.Runtime.Serialization;
+
+using Apache.Qpid.Protocol;
+
+namespace Apache.Qpid
+{
+ /// <summary>
+ /// Thrown when an invalid routing key was sent to the broker
+ /// </summary>
+ [Serializable]
+ public class AMQInvalidRoutingKeyException : AMQException
+ {
+ public AMQInvalidRoutingKeyException(string message)
+ : base(AMQConstant.INVALID_ROUTING_KEY.Code, message, null)
+ {
+ }
+
+ protected AMQInvalidRoutingKeyException(SerializationInfo info,
StreamingContext ctxt)
+ : base(info, ctxt)
+ {
+ }
+
+ }
+}
Propchange: qpid/trunk/qpid/dotnet/Qpid.Common/AMQInvalidRoutingKeyException.cs
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs?rev=886998&r1=886997&r2=886998&view=diff
==============================================================================
--- qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
(original)
+++ qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs Thu
Dec 3 23:55:48 2009
@@ -1,113 +1,113 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-using System;
-using System.Collections;
-using System.Threading;
-
-
-namespace Apache.Qpid.Collections
-{
- /// <summary>
- /// Simple FIFO queue to support multi-threaded consumer
- /// and producers. It supports timeouts in dequeue operations.
- /// </summary>
- public sealed class ConsumerProducerQueue
- {
- private Queue _queue = new Queue();
- private WaitSemaphore _semaphore = new WaitSemaphore();
-
- /// <summary>
- /// Put an item into the tail of the queue
- /// </summary>
- /// <param name="item"></param>
- public void Enqueue(object item)
- {
- lock ( _queue.SyncRoot )
- {
- _queue.Enqueue(item);
- _semaphore.Increment();
- }
- }
-
- /// <summary>
- /// Wait indefinitely for an item to be available
- /// on the queue.
- /// </summary>
- /// <returns>The object at the head of the queue</returns>
- public object Dequeue()
- {
- return Dequeue(Timeout.Infinite);
- }
-
- /// <summary>
- /// Wait up to the number of milliseconds specified
- /// for an item to be available on the queue
- /// </summary>
- /// <param name="timeout">Number of milliseconds to wait</param>
- /// <returns>The object at the head of the queue, or null
- /// if the timeout expires</returns>
- public object Dequeue(long timeout)
- {
- if ( _semaphore.Decrement(timeout) )
- {
- lock ( _queue.SyncRoot )
- {
- return _queue.Dequeue();
- }
- }
- return null;
- }
-
- #region Simple Semaphore
- //
- // Simple Semaphore
- //
-
- class WaitSemaphore
- {
- private int _count;
- private AutoResetEvent _event = new AutoResetEvent(false);
-
- public void Increment()
- {
- Interlocked.Increment(ref _count);
- _event.Set();
- }
-
- public bool Decrement(long timeout)
- {
- if ( timeout > int.MaxValue )
- throw new ArgumentOutOfRangeException("timeout", timeout, "Must
be <= Int32.MaxValue");
-
- int millis = (int) (timeout & 0x7FFFFFFF);
- if ( Interlocked.Decrement(ref _count) > 0 )
- {
- // there are messages in queue, so no need to wait
- return true;
- } else
- {
- return _event.WaitOne(millis, false);
- }
- }
- }
- #endregion // Simple Semaphore
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+using System;
+using System.Collections;
+using System.Threading;
+
+
+namespace Apache.Qpid.Collections
+{
+ /// <summary>
+ /// Simple FIFO queue to support multi-threaded consumer
+ /// and producers. It supports timeouts in dequeue operations.
+ /// </summary>
+ public sealed class ConsumerProducerQueue
+ {
+ private Queue _queue = new Queue();
+ private WaitSemaphore _semaphore = new WaitSemaphore();
+
+ /// <summary>
+ /// Put an item into the tail of the queue
+ /// </summary>
+ /// <param name="item"></param>
+ public void Enqueue(object item)
+ {
+ lock ( _queue.SyncRoot )
+ {
+ _queue.Enqueue(item);
+ _semaphore.Increment();
+ }
+ }
+
+ /// <summary>
+ /// Wait indefinitely for an item to be available
+ /// on the queue.
+ /// </summary>
+ /// <returns>The object at the head of the queue</returns>
+ public object Dequeue()
+ {
+ return Dequeue(Timeout.Infinite);
+ }
+
+ /// <summary>
+ /// Wait up to the number of milliseconds specified
+ /// for an item to be available on the queue
+ /// </summary>
+ /// <param name="timeout">Number of milliseconds to wait</param>
+ /// <returns>The object at the head of the queue, or null
+ /// if the timeout expires</returns>
+ public object Dequeue(long timeout)
+ {
+ if ( _semaphore.Decrement(timeout) )
+ {
+ lock ( _queue.SyncRoot )
+ {
+ return _queue.Dequeue();
+ }
+ }
+ return null;
+ }
+
+ #region Simple Semaphore
+ //
+ // Simple Semaphore
+ //
+
+ class WaitSemaphore
+ {
+ private int _count;
+ private AutoResetEvent _event = new AutoResetEvent(false);
+
+ public void Increment()
+ {
+ Interlocked.Increment(ref _count);
+ _event.Set();
+ }
+
+ public bool Decrement(long timeout)
+ {
+ if ( timeout > int.MaxValue )
+ throw new ArgumentOutOfRangeException("timeout", timeout, "Must
be <= Int32.MaxValue");
+
+ int millis = (int) (timeout & 0x7FFFFFFF);
+ if ( Interlocked.Decrement(ref _count) > 0 )
+ {
+ // there are messages in queue, so no need to wait
+ return true;
+ } else
+ {
+ return _event.WaitOne(millis, false);
+ }
+ }
+ }
+ #endregion // Simple Semaphore
+ }
+}
Propchange:
qpid/trunk/qpid/dotnet/Qpid.Common/Collections/ConsumerProducerQueue.cs
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]