Fix for https://issues.apache.org/jira/browse/AMQ-4719
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5e406e67 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5e406e67 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5e406e67 Branch: refs/heads/activemq-5.9 Commit: 5e406e67fe2f3d305ff322b9d2c68712ef24bd81 Parents: caaf839 Author: Rob Davies <[email protected]> Authored: Tue Nov 12 08:07:33 2013 +0000 Committer: Hadrian Zbarcea <[email protected]> Committed: Wed Mar 12 10:24:28 2014 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/transport/TransportServer.java | 7 +++++++ .../activemq/transport/TransportServerFilter.java | 4 ++++ .../activemq/transport/tcp/TcpTransportServer.java | 13 ++++++++++++- .../activemq/transport/udp/UdpTransportServer.java | 10 ++++++++++ 4 files changed, 33 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5e406e67/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java index 27b8572..fb25f4f 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServer.java @@ -65,4 +65,11 @@ public interface TransportServer extends Service { * connections. */ boolean isSslServer(); + + /** + * Some protocols allow link stealing by default (if 2 connections have the same clientID - the youngest wins). + * This is the default for AMQP and MQTT. However, JMS 1.1 spec requires the opposite + * @return + */ + boolean isAllowLinkStealing(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/5e406e67/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java index 2a06a57..e308774 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportServerFilter.java @@ -59,4 +59,8 @@ public class TransportServerFilter implements TransportServer { public boolean isSslServer() { return next.isSslServer(); } + + public boolean isAllowLinkStealing() { + return next.isAllowLinkStealing(); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/5e406e67/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index 310e9eb..5e1426a 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -72,6 +72,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements protected long maxInactivityDurationInitalDelay = 10000; protected int minmumWireFormatVersion; protected boolean useQueueForAccept = true; + protected boolean allowLinkStealing; + /** * trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer @@ -343,7 +345,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements /** * @param socket - * @param inetAddress + * @param bindAddress * @return real hostName * @throws UnknownHostException */ @@ -511,4 +513,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements public boolean isSslServer() { return false; } + + @Override + public boolean isAllowLinkStealing() { + return allowLinkStealing; + } + + public void setAllowLinkStealing(boolean allowLinkStealing) { + this.allowLinkStealing = allowLinkStealing; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/5e406e67/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java index 79b140a..ccf7abe 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java @@ -52,6 +52,7 @@ public class UdpTransportServer extends TransportServerSupport { private final Transport configuredTransport; private boolean usingWireFormatNegotiation; private final Map<DatagramEndpoint, Transport> transports = new HashMap<DatagramEndpoint, Transport>(); + private boolean allowLinkStealing; public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport, ReplayStrategy replayStrategy) { super(connectURI); @@ -189,4 +190,13 @@ public class UdpTransportServer extends TransportServerSupport { public boolean isSslServer() { return false; } + + @Override + public boolean isAllowLinkStealing() { + return allowLinkStealing; + } + + public void setAllowLinkStealing(boolean allowLinkStealing) { + this.allowLinkStealing = allowLinkStealing; + } }
