This is an automated email from the ASF dual-hosted git repository.
havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-openwire.git
The following commit(s) were added to refs/heads/main by this push:
new 338290f AMQNET-818 Added ConfigureAwait and test
new d383666 Merge pull request #26 from lukeabsent/AMQNET-818
338290f is described below
commit 338290f5156433eeede4d33fddd9b76022d57f0d
Author: lukeabsent <[email protected]>
AuthorDate: Fri Dec 16 19:58:02 2022 +0100
AMQNET-818 Added ConfigureAwait and test
---
src/Transport/ResponseCorrelator.cs | 4 +-
test/ConnectionFactoryTest.cs | 24 ++++-
...SingleThreadSimpleTestSynchronizationContext.cs | 109 +++++++++++++++++++++
3 files changed, 134 insertions(+), 3 deletions(-)
diff --git a/src/Transport/ResponseCorrelator.cs
b/src/Transport/ResponseCorrelator.cs
index acf7aa4..599b10d 100644
--- a/src/Transport/ResponseCorrelator.cs
+++ b/src/Transport/ResponseCorrelator.cs
@@ -102,13 +102,13 @@ namespace Apache.NMS.ActiveMQ.Transport
}
}, false))
{
- return await future.Task;
+ return await future.Task.Await();
}
}
}
else
{
- return await future.Task;
+ return await future.Task.Await();
}
}
diff --git a/test/ConnectionFactoryTest.cs b/test/ConnectionFactoryTest.cs
index c0de1c4..7d48ecd 100644
--- a/test/ConnectionFactoryTest.cs
+++ b/test/ConnectionFactoryTest.cs
@@ -16,7 +16,7 @@
*/
using System;
-
+using System.Threading;
using Apache.NMS.Test;
using Apache.NMS.Util;
using Apache.NMS.ActiveMQ;
@@ -192,6 +192,28 @@ namespace Apache.NMS.ActiveMQ.Test
Assert.AreEqual(dispatchAsync,
connection.DispatchAsync);
}
}
+
+ [Timeout(10_000)]
+ [Test]
+ public void
TestConnectionStartupDontDeadlockOnSingleThreadedSynchContext()
+ {
+ var singleContext = new
SingleThreadSimpleTestSynchronizationContext();
+ ManualResetEvent readyEvent = new
ManualResetEvent(false);
+ singleContext.Post((state) =>
+ {
+ Uri uri =
URISupport.CreateCompatibleUri(ReplaceEnvVar("tcp://${activemqhost}:61616"));
+ ConnectionFactory factory = new
ConnectionFactory(uri);
+ Assert.IsNotNull(factory);
+ using (IConnection connection =
factory.CreateConnection("", ""))
+ {
+ connection.Start();
+ }
+
+ readyEvent.Set();
+ }, null);
+
+ Assert.AreEqual(true,
readyEvent.WaitOne(TimeSpan.FromSeconds(8)));
+ }
}
}
diff --git a/test/Util/SingleThreadSimpleTestSynchronizationContext.cs
b/test/Util/SingleThreadSimpleTestSynchronizationContext.cs
new file mode 100644
index 0000000..e6d5c61
--- /dev/null
+++ b/test/Util/SingleThreadSimpleTestSynchronizationContext.cs
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+ /// <summary>
+ /// Very simple, basic and incomplete, just trying to recreate single
threaded synchronization context for tests
+ /// </summary>
+ internal class SingleThreadSimpleTestSynchronizationContext :
SynchronizationContext
+ {
+ private Queue<Work> queue = new Queue<Work>();
+
+ public SingleThreadSimpleTestSynchronizationContext()
+ {
+ var th = new Thread(Run);
+ th.IsBackground = true;
+ th.Name = nameof(SingleThreadSimpleTestSynchronizationContext);
+ th.Start();
+ }
+
+ public SingleThreadSimpleTestSynchronizationContext(Queue<Work> queue)
+ {
+ this.queue = queue;
+ }
+
+ internal class Work
+ {
+ public SendOrPostCallback Callback { get; set; }
+ public object State { get; set; }
+ }
+
+
+
+ public override void Post(SendOrPostCallback d, object? state)
+ {
+ queue.Enqueue(new Work() {Callback = d, State = state});
+ }
+
+ public override void Send(SendOrPostCallback d, object? state)
+ {
+ if (SynchronizationContext.Current == this)
+ {
+ d(state);
+ }
+ else
+ {
+ queue.Enqueue(new Work() {Callback = d, State = state});
+ }
+ }
+
+ private void Run()
+ {
+ SynchronizationContext.SetSynchronizationContext(this);
+ while (true)
+ {
+ if (queue.Count > 0)
+ {
+ var work = queue.Dequeue();
+ Console.WriteLine(SynchronizationContext.Current);
+ work.Callback.Invoke(work.State);
+ }
+ else
+ {
+ Thread.Sleep(1);
+ }
+ }
+ }
+
+ public override SynchronizationContext CreateCopy()
+ {
+ return new
SingleThreadSimpleTestSynchronizationContext(this.queue);
+ }
+
+ public override void OperationCompleted()
+ {
+ throw new NotSupportedException();
+ }
+
+ public override void OperationStarted()
+ {
+ throw new NotSupportedException();
+ }
+
+
+
+ public override int Wait(IntPtr[] waitHandles, bool waitAll, int
millisecondsTimeout)
+ {
+ throw new NotSupportedException();
+ }
+ }
+}
\ No newline at end of file