This is an automated email from the ASF dual-hosted git repository.
havret pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-nms-openwire.git
The following commit(s) were added to refs/heads/main by this push:
new fe3c59d AMQNET-838 ActiveMQ NMS client does not support nested
parameters for failover transport
fe3c59d is described below
commit fe3c59dc9b6ce4c4e73b5e23ccebad489fc0d42e
Author: vivanku <[email protected]>
AuthorDate: Wed Jan 31 03:02:17 2024 +0530
AMQNET-838 ActiveMQ NMS client does not support nested parameters for
failover transport
Refer "Configuring Nested URI Options" section in
https://activemq.apache.org/failover-transport-reference.html
This is supported for jms client however not for nms client.
reference for jms client :
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java#L70
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java#L1431
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java#L1019
https://github.com/apache/activemq/blob/a2d5d28c1f24d67f57d245003f1d7f96d696dd7c/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java#L1194
As part of this PR , adding support to nested parameters in NMS client for
failover transport.
Co-authored-by: Kumar, Vivek <[email protected]>
---
src/Transport/Failover/FailoverTransport.cs | 43 +++++++++++++++++++---
src/Transport/Failover/FailoverTransportFactory.cs | 10 ++++-
test/Transport/failover/FailoverTransportTest.cs | 30 ++++++++++++++-
3 files changed, 75 insertions(+), 8 deletions(-)
diff --git a/src/Transport/Failover/FailoverTransport.cs
b/src/Transport/Failover/FailoverTransport.cs
index f14a927..ce78cc1 100644
--- a/src/Transport/Failover/FailoverTransport.cs
+++ b/src/Transport/Failover/FailoverTransport.cs
@@ -94,9 +94,10 @@ namespace Apache.NMS.ActiveMQ.Transport.Failover
private List<Uri> priorityList = new List<Uri>();
private bool priorityBackupAvailable = false;
private String sslProtocol = null;
+ private string nestedExtraQueryOptions;
- // Not Sure how to work these back in with all the changes.
- //private int asyncTimeout = 45000;
+ // Not Sure how to work these back in with all the changes.
+ //private int asyncTimeout = 45000;
//private bool asyncConnect = false;
public FailoverTransport()
@@ -1177,7 +1178,7 @@ namespace Apache.NMS.ActiveMQ.Transport.Failover
// URI from the pool until next time around.
if (transport == null)
{
- uri = iter.Current;
+ uri =
AddExtraQueryOptions(iter.Current);
transport =
TransportFactory.CompositeConnect(uri);
}
@@ -1310,14 +1311,14 @@ namespace Apache.NMS.ActiveMQ.Transport.Failover
}
}
- foreach(Uri uri in connectList)
+ foreach(Uri u in connectList)
{
if (disposed)
{
break;
}
-
- if(ConnectedTransportURI != null &&
!ConnectedTransportURI.Equals(uri))
+ Uri uri = AddExtraQueryOptions(u);
+ if (ConnectedTransportURI != null &&
!ConnectedTransportURI.Equals(uri))
{
try
{
@@ -1712,5 +1713,35 @@ namespace Apache.NMS.ActiveMQ.Transport.Failover
return result;
}
+
+ public void SetNestedExtraQueryOptions(String nestedExtraQueryOptions)
+ {
+ this.nestedExtraQueryOptions = nestedExtraQueryOptions;
+ }
+
+ private Uri AddExtraQueryOptions(Uri uri)
+ {
+ try
+ {
+ if (!string.IsNullOrEmpty(nestedExtraQueryOptions))
+ {
+ if (uri.Query == null)
+ {
+ uri = URISupport.CreateUriWithQuery(uri,
nestedExtraQueryOptions);
+ }
+ else
+ {
+ uri = URISupport.CreateUriWithQuery(uri, uri.Query +
"&" + nestedExtraQueryOptions);
+ }
+ Tracer.Info($"URI with nested parameter is
{uri.ToString()}");
+ }
+ }
+ catch (UriFormatException e)
+ {
+ Tracer.Error(e.Message);
+ throw;
+ }
+ return uri;
+ }
}
}
diff --git a/src/Transport/Failover/FailoverTransportFactory.cs
b/src/Transport/Failover/FailoverTransportFactory.cs
index 504d07b..b5740f0 100644
--- a/src/Transport/Failover/FailoverTransportFactory.cs
+++ b/src/Transport/Failover/FailoverTransportFactory.cs
@@ -55,7 +55,15 @@ namespace Apache.NMS.ActiveMQ.Transport.Failover
StringDictionary options = compositData.Parameters;
FailoverTransport transport = CreateTransport(options);
transport.Add(false, compositData.Components);
- return transport;
+ try
+ {
+
transport.SetNestedExtraQueryOptions(URISupport.CreateQueryString(URISupport.GetProperties(options,
"nested.")));
+ }
+ catch (Exception e)
+ {
+ Tracer.Error($"Error in setting nested
parameters {e.Message}");
+ }
+ return transport;
}
protected FailoverTransport CreateTransport(StringDictionary
parameters)
diff --git a/test/Transport/failover/FailoverTransportTest.cs
b/test/Transport/failover/FailoverTransportTest.cs
index ec3cc67..4bdfb51 100644
--- a/test/Transport/failover/FailoverTransportTest.cs
+++ b/test/Transport/failover/FailoverTransportTest.cs
@@ -162,8 +162,36 @@ namespace Apache.NMS.ActiveMQ.Test
Assert.IsTrue(failover.IsConnected);
}
}
+ [Test]
+ public void FailoverTransportWithNestedParametersTest()
+ {
+ Uri uri = new
Uri("failover:(mock://localhost:61616)?transport.randomize=false&transport.backup=true&nested.transport.failOnSendMessage=true&nested.transport.numSentMessagesBeforeFail=20");
+ FailoverTransportFactory factory = new FailoverTransportFactory();
- [Test]
+ using (ITransport transport = factory.CreateTransport(uri))
+ {
+ Assert.IsNotNull(transport);
+ transport.CommandAsync = OnCommand;
+ transport.Exception = OnException;
+ transport.Resumed = OnResumed;
+ transport.Interrupted = OnInterrupted;
+
+ FailoverTransport failover =
transport.Narrow(typeof(FailoverTransport)) as FailoverTransport;
+ Assert.IsNotNull(failover);
+ Assert.IsFalse(failover.Randomize);
+ Assert.IsTrue(failover.Backup);
+
+ transport.Start();
+ Thread.Sleep(1000);
+ Assert.IsTrue(failover.IsConnected);
+
+ MockTransport mock = transport.Narrow(typeof(MockTransport))
as MockTransport;
+ Assert.IsNotNull(mock);
+ Assert.IsTrue(mock.FailOnSendMessage);
+ Assert.AreEqual(20,mock.NumSentMessagesBeforeFail);
+ }
+ }
+ [Test]
public void FailoverTransportCreateFailOnCreateTest()
{
Uri uri = new
Uri("failover:(mock://localhost:61616?transport.failOnCreate=true)?" +