This is an automated email from the ASF dual-hosted git repository.

havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-openwire.git


The following commit(s) were added to refs/heads/main by this push:
     new 338290f  AMQNET-818 Added ConfigureAwait and test
     new d383666  Merge pull request #26 from lukeabsent/AMQNET-818
338290f is described below

commit 338290f5156433eeede4d33fddd9b76022d57f0d
Author: lukeabsent <[email protected]>
AuthorDate: Fri Dec 16 19:58:02 2022 +0100

    AMQNET-818 Added ConfigureAwait and test
---
 src/Transport/ResponseCorrelator.cs                |   4 +-
 test/ConnectionFactoryTest.cs                      |  24 ++++-
 ...SingleThreadSimpleTestSynchronizationContext.cs | 109 +++++++++++++++++++++
 3 files changed, 134 insertions(+), 3 deletions(-)

diff --git a/src/Transport/ResponseCorrelator.cs 
b/src/Transport/ResponseCorrelator.cs
index acf7aa4..599b10d 100644
--- a/src/Transport/ResponseCorrelator.cs
+++ b/src/Transport/ResponseCorrelator.cs
@@ -102,13 +102,13 @@ namespace Apache.NMS.ActiveMQ.Transport
                                               }
                                       }, false))
                                {
-                                       return await future.Task;
+                                       return await future.Task.Await();
                                }
                        }
                }
                else
                {
-                       return await future.Task;
+                       return await future.Task.Await();
                }
         }
 
diff --git a/test/ConnectionFactoryTest.cs b/test/ConnectionFactoryTest.cs
index c0de1c4..7d48ecd 100644
--- a/test/ConnectionFactoryTest.cs
+++ b/test/ConnectionFactoryTest.cs
@@ -16,7 +16,7 @@
  */
 
 using System;
-
+using System.Threading;
 using Apache.NMS.Test;
 using Apache.NMS.Util;
 using Apache.NMS.ActiveMQ;
@@ -192,6 +192,28 @@ namespace Apache.NMS.ActiveMQ.Test
                                Assert.AreEqual(dispatchAsync, 
connection.DispatchAsync);
                        }
                }
+
+               [Timeout(10_000)]
+               [Test]
+               public void 
TestConnectionStartupDontDeadlockOnSingleThreadedSynchContext()
+               {
+                       var singleContext = new 
SingleThreadSimpleTestSynchronizationContext();
+                       ManualResetEvent readyEvent = new 
ManualResetEvent(false);
+                       singleContext.Post((state) =>
+                       {
+                               Uri uri = 
URISupport.CreateCompatibleUri(ReplaceEnvVar("tcp://${activemqhost}:61616"));
+                               ConnectionFactory factory = new 
ConnectionFactory(uri);
+                               Assert.IsNotNull(factory);
+                               using (IConnection connection = 
factory.CreateConnection("", ""))
+                               {
+                                       connection.Start();
+                               }
+
+                               readyEvent.Set();
+                       }, null);
+
+                       Assert.AreEqual(true, 
readyEvent.WaitOne(TimeSpan.FromSeconds(8)));
+               }
        }
 }
 
diff --git a/test/Util/SingleThreadSimpleTestSynchronizationContext.cs 
b/test/Util/SingleThreadSimpleTestSynchronizationContext.cs
new file mode 100644
index 0000000..e6d5c61
--- /dev/null
+++ b/test/Util/SingleThreadSimpleTestSynchronizationContext.cs
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Apache.NMS.ActiveMQ.Test
+{
+    /// <summary>
+    /// Very simple, basic and incomplete, just trying to recreate single 
threaded synchronization context for tests
+    /// </summary>
+    internal class SingleThreadSimpleTestSynchronizationContext : 
SynchronizationContext
+    {
+        private Queue<Work> queue = new Queue<Work>();
+
+        public SingleThreadSimpleTestSynchronizationContext()
+        {
+            var th = new Thread(Run);
+            th.IsBackground = true;
+            th.Name = nameof(SingleThreadSimpleTestSynchronizationContext);
+            th.Start();
+        }
+
+        public SingleThreadSimpleTestSynchronizationContext(Queue<Work> queue)
+        {
+            this.queue = queue;
+        }
+
+        internal class Work
+        {
+            public SendOrPostCallback Callback { get; set; }
+            public object State { get; set; }
+        }
+
+       
+
+        public override void Post(SendOrPostCallback d, object? state)
+        {
+            queue.Enqueue(new Work() {Callback = d, State = state});
+        }
+
+        public override void Send(SendOrPostCallback d, object? state)
+        {
+            if (SynchronizationContext.Current == this)
+            {
+                d(state);
+            }
+            else
+            {
+                queue.Enqueue(new Work() {Callback = d, State = state});
+            }
+        }
+
+        private void Run()
+        {
+            SynchronizationContext.SetSynchronizationContext(this);
+            while (true)
+            {
+                if (queue.Count > 0)
+                {
+                    var work = queue.Dequeue();
+                    Console.WriteLine(SynchronizationContext.Current);
+                    work.Callback.Invoke(work.State);
+                }
+                else
+                {
+                    Thread.Sleep(1);
+                }
+            }
+        }
+        
+        public override SynchronizationContext CreateCopy()
+        {
+            return new 
SingleThreadSimpleTestSynchronizationContext(this.queue);
+        }
+
+        public override void OperationCompleted()
+        {
+            throw new NotSupportedException();
+        }
+
+        public override void OperationStarted()
+        {
+            throw new NotSupportedException();
+        }
+        
+        
+
+        public override int Wait(IntPtr[] waitHandles, bool waitAll, int 
millisecondsTimeout)
+        {
+            throw new NotSupportedException();
+        }
+    }
+}
\ No newline at end of file

Reply via email to