ARTEMIS-611 refactor STOMP cxn TTL + heart-beat Adds 3 new URI properties for STOMP acceptors to allow finer grained configuration of heart-beat / connection-TTL behavior.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/89e0c461 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/89e0c461 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/89e0c461 Branch: refs/heads/master Commit: 89e0c461e5128768513943e60f6e3992f066f529 Parents: dc76e2a Author: jbertram <[email protected]> Authored: Fri Jul 15 08:36:24 2016 -0500 Committer: jbertram <[email protected]> Committed: Mon Jul 18 17:10:05 2016 -0500 ---------------------------------------------------------------------- .../remoting/impl/netty/TransportConstants.java | 9 + .../core/protocol/stomp/StompConnection.java | 6 +- .../protocol/stomp/StompProtocolManager.java | 2 +- .../stomp/v11/StompFrameHandlerV11.java | 130 +++++------ .../core/remoting/server/RemotingService.java | 6 + .../server/impl/RemotingServiceImpl.java | 130 +++++++---- .../en/protocols-interoperability.md | 65 ++++-- .../integration/stomp/StompOverHttpTest.java | 4 +- .../stomp/StompOverWebsocketTest.java | 4 +- .../tests/integration/stomp/StompTest.java | 18 ++ .../tests/integration/stomp/StompTestBase.java | 88 ++++++-- .../integration/stomp/v11/StompV11Test.java | 214 +++++++++++++++++++ .../integration/stomp/v11/StompV11TestBase.java | 2 +- 13 files changed, 504 insertions(+), 174 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index f373639..53dc204 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -187,6 +187,12 @@ public class TransportConstants { public static final String CONNECTION_TTL = "connectionTtl"; + public static final String CONNECTION_TTL_MAX = "connectionTtlMax"; + + public static final String CONNECTION_TTL_MIN = "connectionTtlMin"; + + public static final String HEART_BEAT_TO_CONNECTION_TTL_MODIFIER = "heartBeatToConnectionTtlModifier"; + public static final String STOMP_ENABLE_MESSAGE_ID = "stomp-enable-message-id"; public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size"; @@ -230,6 +236,9 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT); allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); + allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX); + allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN); + allowableAcceptorKeys.add(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER); allowableAcceptorKeys.add(TransportConstants.STOMP_ENABLE_MESSAGE_ID); allowableAcceptorKeys.add(TransportConstants.CONNECTIONS_ALLOWED); allowableAcceptorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 1cfd0a5..5396c5b 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -269,7 +269,7 @@ public final class StompConnection implements RemotingConnection { } } - Acceptor getAcceptorUsed() { + public Acceptor getAcceptorUsed() { return acceptorUsed; } @@ -720,4 +720,8 @@ public final class StompConnection implements RemotingConnection { return minLargeMessageSize; } + public StompProtocolManager getManager() { + return manager; + } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index 7642e69..2c8751c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -55,7 +55,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto /** * StompProtocolManager */ -class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> { +public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> { // Constants ----------------------------------------------------- // Attributes ---------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java index 7f284dd..0eb1951 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java @@ -29,7 +29,10 @@ import org.apache.activemq.artemis.core.protocol.stomp.StompDecoder; import org.apache.activemq.artemis.core.protocol.stomp.StompFrame; import org.apache.activemq.artemis.core.protocol.stomp.VersionedStompFrameHandler; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.utils.CertificateUtil; import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProtocolMessageBundle.BUNDLE; @@ -92,7 +95,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements response.addHeader(Stomp.Headers.Connected.HEART_BEAT, "0,0"); } else { - response.addHeader(Stomp.Headers.Connected.HEART_BEAT, heartBeater.getServerHeartBeatValue()); + response.addHeader(Stomp.Headers.Connected.HEART_BEAT, Long.toString(heartBeater.serverPingPeriod) + "," + Long.toString(heartBeater.clientPingResponse)); } } } @@ -231,7 +234,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements } private void startHeartBeat() { - if (heartBeater != null) { + if (heartBeater != null && heartBeater.serverPingPeriod != 0) { heartBeater.start(); } } @@ -242,31 +245,50 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements return frame; } - //server heart beat - //algorithm: - //(a) server ping: if server hasn't sent any frame within serverPing - //interval, send a ping. - //(b) accept ping: if server hasn't received any frame within - // 2*serverAcceptPing, disconnect! + /* + * HeartBeater functions: + * (a) server ping: if server hasn't sent any frame within serverPingPeriod interval, send a ping + * (b) configure connection ttl so that org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl.FailureCheckAndFlushThread + * can deal with closing connections which go stale + */ private class HeartBeater extends Thread { private static final int MIN_SERVER_PING = 500; - private static final int MIN_CLIENT_PING = 500; - long serverPing = 0; - long serverAcceptPing = 0; + long serverPingPeriod = 0; + long clientPingResponse; volatile boolean shutdown = false; - AtomicLong lastPingTime = new AtomicLong(0); - AtomicLong lastAccepted = new AtomicLong(0); - StompFrame pingFrame; + AtomicLong lastPingTimestamp = new AtomicLong(0); + ConnectionEntry connectionEntry; - private HeartBeater(long clientPing, long clientAcceptPing) { - if (clientPing != 0) { - serverAcceptPing = clientPing > MIN_CLIENT_PING ? clientPing : MIN_CLIENT_PING; + private HeartBeater(final long clientPing, final long clientAcceptPing) { + connectionEntry = ((RemotingServiceImpl)connection.getManager().getServer().getRemotingService()).getConnectionEntry(connection.getID()); + clientPingResponse = clientPing; + + String ttlMaxStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MAX); + long ttlMax = ttlMaxStr == null ? Long.MAX_VALUE : Long.valueOf(ttlMaxStr); + + String ttlMinStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.CONNECTION_TTL_MIN); + long ttlMin = ttlMinStr == null ? 500 : Long.valueOf(ttlMinStr); + + String heartBeatToTtlModifierStr = (String) connection.getAcceptorUsed().getConfiguration().get(TransportConstants.HEART_BEAT_TO_CONNECTION_TTL_MODIFIER); + double heartBeatToTtlModifier = heartBeatToTtlModifierStr == null ? 2 : Double.valueOf(heartBeatToTtlModifierStr); + + // The connection's TTL should be clientPing * 2, MIN_CLIENT_PING, or ttlMax set on the acceptor + long connectionTtl = (long) (clientPing * heartBeatToTtlModifier); + if (connectionTtl < ttlMin) { + connectionTtl = ttlMin; + clientPingResponse = (long) (ttlMin / heartBeatToTtlModifier); } + else if (connectionTtl > ttlMax) { + connectionTtl = ttlMax; + clientPingResponse = (long) (ttlMax / heartBeatToTtlModifier); + } + ActiveMQServerLogger.LOGGER.info("Setting TTL to: " + connectionTtl); + connectionEntry.ttl = connectionTtl; if (clientAcceptPing != 0) { - serverPing = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING; + serverPingPeriod = clientAcceptPing > MIN_SERVER_PING ? clientAcceptPing : MIN_SERVER_PING; } } @@ -275,85 +297,32 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements this.notify(); } - public String getServerHeartBeatValue() { - return String.valueOf(serverPing) + "," + String.valueOf(serverAcceptPing); - } - public void pinged() { - lastPingTime.set(System.currentTimeMillis()); + lastPingTimestamp.set(System.currentTimeMillis()); } @Override public void run() { - lastAccepted.set(System.currentTimeMillis()); - pingFrame = createPingFrame(); - synchronized (this) { while (!shutdown) { - long dur1 = 0; - long dur2 = 0; - - if (serverPing != 0) { - dur1 = System.currentTimeMillis() - lastPingTime.get(); - if (dur1 >= serverPing) { - lastPingTime.set(System.currentTimeMillis()); - connection.ping(pingFrame); - dur1 = 0; - } + long lastPingPeriod = System.currentTimeMillis() - lastPingTimestamp.get(); + if (lastPingPeriod >= serverPingPeriod) { + lastPingTimestamp.set(System.currentTimeMillis()); + connection.ping(createPingFrame()); + lastPingPeriod = 0; } - - if (serverAcceptPing != 0) { - dur2 = System.currentTimeMillis() - lastAccepted.get(); - - if (dur2 > (2 * serverAcceptPing)) { - connection.disconnect(false); - shutdown = true; - break; - } - } - - long waitTime1 = 0; - long waitTime2 = 0; - - if (serverPing > 0) { - waitTime1 = serverPing - dur1; - } - - if (serverAcceptPing > 0) { - waitTime2 = serverAcceptPing * 2 - dur2; - } - - long waitTime = 10L; - - if ((waitTime1 > 0) && (waitTime2 > 0)) { - waitTime = Math.min(waitTime1, waitTime2); - } - else if (waitTime1 > 0) { - waitTime = waitTime1; - } - else if (waitTime2 > 0) { - waitTime = waitTime2; - } - try { - this.wait(waitTime); + this.wait(serverPingPeriod - lastPingPeriod); } catch (InterruptedException e) { } } } } - - public void pingAccepted() { - this.lastAccepted.set(System.currentTimeMillis()); - } } @Override public void requestAccepted(StompFrame request) { - if (heartBeater != null) { - heartBeater.pingAccepted(); - } } @Override @@ -403,10 +372,7 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler implements // either "[\r]\n"s or "\n"s) while (true) { if (workingBuffer[offset] == NEW_LINE) { - if (heartBeater != null) { - //client ping - heartBeater.pingAccepted(); - } + //client ping nextChar = false; } else if (workingBuffer[offset] == CR) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java index 061e5a6..ea3107c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Set; import org.apache.activemq.artemis.api.core.BaseInterceptor; +import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -99,4 +100,9 @@ public interface RemotingService { */ Acceptor getAcceptor(String name); + Acceptor createAcceptor(String name, String uri) throws Exception; + + Acceptor createAcceptor(TransportConfiguration transportConfiguration); + + void destroyAcceptor(String name) throws Exception; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 3a073e9..1a8e32b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -67,6 +67,7 @@ import org.apache.activemq.artemis.spi.core.remoting.AcceptorFactory; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ServerConnectionLifeCycleListener; +import org.apache.activemq.artemis.uri.AcceptorTransportConfigurationParser; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.apache.activemq.artemis.utils.ConfigurationHelper; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -77,8 +78,6 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif private static final Logger logger = Logger.getLogger(RemotingServiceImpl.class); - public static final long CONNECTION_TTL_CHECK_INTERVAL = 2000; - // Attributes ---------------------------------------------------- private volatile boolean started = false; @@ -119,6 +118,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif private AtomicLong totalConnectionCount = new AtomicLong(0); + private long connectionTtlCheckInterval; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -163,6 +164,8 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif if (protocolManagerFactories != null) { loadProtocolManagerFactories(protocolManagerFactories); } + + this.connectionTtlCheckInterval = config.getConnectionTtlCheckInterval(); } private void setInterceptors(Configuration configuration) { @@ -198,67 +201,94 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif threadPool = Executors.newCachedThreadPool(tFactory); for (TransportConfiguration info : acceptorsConfig) { - try { - AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName()); + createAcceptor(info); + } - Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap<>(); + /** + * Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid + * race conditions. See {@link #startAcceptors()}. + */ - @SuppressWarnings("deprecation") - String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams()); - if (protocol != null) { - ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol(); - locateProtocols(protocol, info, selectedProtocolFactories); - } + // This thread checks connections that need to be closed, and also flushes confirmations + failureCheckAndFlushThread = new FailureCheckAndFlushThread(connectionTtlCheckInterval); - String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams()); + failureCheckAndFlushThread.start(); - if (protocols != null) { - locateProtocols(protocols, info, selectedProtocolFactories); - } + started = true; + } - ClusterConnection clusterConnection = lookupClusterConnection(info); + @Override + public Acceptor createAcceptor(String name, String uri) throws Exception { + AcceptorTransportConfigurationParser parser = new AcceptorTransportConfigurationParser(); - // If empty: we get the default list - if (selectedProtocolFactories.isEmpty()) { - selectedProtocolFactories = protocolMap; - } + List<TransportConfiguration> configurations = parser.newObject(parser.expandURI(uri), name); - Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>(); - for (Map.Entry<String, ProtocolManagerFactory> entry: selectedProtocolFactories.entrySet()) { - selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors)); - } + return createAcceptor(configurations.get(0)); + } + @Override + public Acceptor createAcceptor(TransportConfiguration info) { + Acceptor acceptor = null; - Acceptor acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols); + try { + AcceptorFactory factory = server.getServiceRegistry().getAcceptorFactory(info.getName(), info.getFactoryClassName()); - if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { - acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); - } + Map<String, ProtocolManagerFactory> selectedProtocolFactories = new ConcurrentHashMap<>(); - acceptors.put(info.getName(), acceptor); + @SuppressWarnings("deprecation") + String protocol = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOL_PROP_NAME, null, info.getParams()); + if (protocol != null) { + ActiveMQServerLogger.LOGGER.warnDeprecatedProtocol(); + locateProtocols(protocol, info, selectedProtocolFactories); + } - if (managementService != null) { - acceptor.setNotificationService(managementService); + String protocols = ConfigurationHelper.getStringProperty(TransportConstants.PROTOCOLS_PROP_NAME, null, info.getParams()); - managementService.registerAcceptor(acceptor, info); - } + if (protocols != null) { + locateProtocols(protocols, info, selectedProtocolFactories); } - catch (Exception e) { - ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName()); + + ClusterConnection clusterConnection = lookupClusterConnection(info); + + // If empty: we get the default list + if (selectedProtocolFactories.isEmpty()) { + selectedProtocolFactories = protocolMap; } - } - /** - * Don't start the acceptors here. Only start the acceptors at the every end of the start-up process to avoid - * race conditions. See {@link #startAcceptors()}. - */ + Map<String, ProtocolManager> selectedProtocols = new ConcurrentHashMap<>(); + for (Entry<String, ProtocolManagerFactory> entry: selectedProtocolFactories.entrySet()) { + selectedProtocols.put(entry.getKey(), entry.getValue().createProtocolManager(server, info.getExtraParams(), incomingInterceptors, outgoingInterceptors)); + } - // This thread checks connections that need to be closed, and also flushes confirmations - failureCheckAndFlushThread = new FailureCheckAndFlushThread(RemotingServiceImpl.CONNECTION_TTL_CHECK_INTERVAL); - failureCheckAndFlushThread.start(); + acceptor = factory.createAcceptor(info.getName(), clusterConnection, info.getParams(), new DelegatingBufferHandler(), this, threadPool, scheduledThreadPool, selectedProtocols); - started = true; + if (defaultInvmSecurityPrincipal != null && acceptor.isUnsecurable()) { + acceptor.setDefaultActiveMQPrincipal(defaultInvmSecurityPrincipal); + } + + acceptors.put(info.getName(), acceptor); + + if (managementService != null) { + acceptor.setNotificationService(managementService); + + managementService.registerAcceptor(acceptor, info); + } + } + catch (Exception e) { + ActiveMQServerLogger.LOGGER.errorCreatingAcceptor(e, info.getFactoryClassName()); + } + + return acceptor; + } + + @Override + public void destroyAcceptor(String name) throws Exception { + Acceptor acceptor = acceptors.get(name); + if (acceptor != null) { + acceptor.stop(); + acceptors.remove(name); + } } @Override @@ -423,6 +453,17 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif } } + public ConnectionEntry getConnectionEntry(final Object remotingConnectionID) { + ConnectionEntry entry = connections.get(remotingConnectionID); + + if (entry != null) { + return entry; + } + else { + return null; + } + } + @Override public RemotingConnection removeConnection(final Object remotingConnectionID) { ConnectionEntry entry = connections.remove(remotingConnectionID); @@ -647,6 +688,7 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif @Override public void run() { while (!closed) { + ActiveMQServerLogger.LOGGER.info("Checking..."); try { long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/docs/user-manual/en/protocols-interoperability.md ---------------------------------------------------------------------- diff --git a/docs/user-manual/en/protocols-interoperability.md b/docs/user-manual/en/protocols-interoperability.md index 5c19311..5b486d0 100644 --- a/docs/user-manual/en/protocols-interoperability.md +++ b/docs/user-manual/en/protocols-interoperability.md @@ -256,15 +256,6 @@ set). Apache ActiveMQ Artemis currently doesn't support virtual hosting, which means the 'host' header in CONNECT fram will be ignored. -#### Heart-beating - -Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat -intervals. The minimum interval for both client and server heartbeats is -500 milliseconds. That means if a client sends a CONNECT frame with -heartbeat values lower than 500, the server will defaults the value to -500 milliseconds regardless the values of the 'heart-beat' header in the -frame. - ### Mapping Stomp destinations to Apache ActiveMQ Artemis addresses and queues Stomp clients deals with *destinations* when sending messages and @@ -278,7 +269,14 @@ specified destination is mapped to an address. When a Stomp client subscribes (or unsubscribes) for a destination (using a `SUBSCRIBE` or `UNSUBSCRIBE` frame), the destination is mapped to an Apache ActiveMQ Artemis queue. -### STOMP and connection-ttl +### STOMP heart-beating and connection-ttl + +Apache ActiveMQ Artemis specifies a minimum value for both client and server heart-beat +intervals. The minimum interval for both client and server heartbeats is +500 milliseconds. That means if a client sends a CONNECT frame with +heartbeat values lower than 500, the server will defaults the value to +500 milliseconds regardless the values of the 'heart-beat' header in the +frame. Well behaved STOMP clients will always send a DISCONNECT frame before closing their connections. In this case the server will clear up any @@ -288,19 +286,50 @@ they crash the server will have no way of knowing immediately whether the client is still alive or not. STOMP connections therefore default to a connection-ttl value of 1 minute (see chapter on [connection-ttl](#connection-ttl) for more information. This value can -be overridden using connection-ttl-override. - -If you need a specific connectionTtl for your stomp connections without -affecting the connectionTtlOverride setting, you can configure your -stomp acceptor with the "connectionTtl" property, which is used to set -the ttl for connections that are created from that acceptor. For -example: +be overridden using the `connection-ttl-override` property or if you +need a specific connectionTtl for your stomp connections without +affecting the broker-wide `connection-ttl-override` setting, you can +configure your stomp acceptor with the "connectionTtl" property, which +is used to set the ttl for connections that are created from that acceptor. +For example: <acceptor name="stomp-acceptor">tcp://localhost:61613?protocols=STOMP;connectionTtl=20000</acceptor> The above configuration will make sure that any stomp connection that is created from that acceptor will have its connection-ttl set to 20 -seconds. +seconds. The `connectionTtl` set on an acceptor will take precedence over +`connection-ttl-override`. + +Since Stomp 1.0 doesn't support heart-beating then all connections from +Stomp 1.0 clients will have a connection TTL imposed upon them by the broker +based on the aforementioned configuration options. Likewise, any Stomp 1.1 +or 1.2 clients that don't specify a heart-beat or disable heart-beating +(e.g. by sending `0,0` in the `heart-beat` header) will have a connection +TTL imposed upon them by the broker. + +For Stomp 1.1 and 1.2 clients which send a valid `heart-beat` header then +their connection TTL will be set accordingly. However, the broker will not +set the connection TTL to the same value as the specified in the `heart-beat` +since even small network delays could then cause spurious disconnects. Instead, +the value in the heart-beat will be multiplied by the `heartBeatConnectionTtlModifer` +specified on the acceptor. The `heartBeatConnectionTtlModifer` is a decimal +value that defaults to 2.0 so for example, if a client sends a `heart-beat` +frame of `1000,0` the the connection TTL will be set to `2000` so that the +ping frames sent every 1000 milliseconds will have a sufficient cushion so as +not to be considered late and trigger a disconnect. + +The minimum and maximum connection TTL allowed can also be specified on the +acceptor via the `connectionTtlMin` and `connectionTtlMax` properties respectively. +The default `connectionTtlMin` is 500 and the default `connectionTtlMax` is Java's +`Long.MAX_VALUE` meaning there essentially is no max connection TTL by default. +Keep in mind that the `heartBeatConnectionTtlModifer` is relevant here. For +example, if a client sends a `heart-beat` header of `20000,0` and the acceptor +is using a `connectionTtlMax` of `30000` and a default `heartBeatConnectionTtlModifer` +of `2.0` then the connection TTL would be `40000` (i.e. `20000` * `2.0`) which would +exceed the `connectionTtlMax`. In this case the server would respond to the client +with a `heart-beat` header of `0,15000` (i.e. `30000` / `2.0`). As described +previously, this is to make sure there is a sufficient cushion for the client +heart-beats. The same kind of calculation is done for `connectionTtlMin`. > **Note** > http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java index 8e588f9..783f35f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java @@ -37,13 +37,13 @@ import io.netty.handler.codec.string.StringEncoder; public class StompOverHttpTest extends StompTest { @Override - protected void addChannelHandlers(SocketChannel ch) { + protected void addChannelHandlers(int index, SocketChannel ch) { ch.pipeline().addLast(new HttpRequestEncoder()); ch.pipeline().addLast(new HttpResponseDecoder()); ch.pipeline().addLast(new HttpHandler()); ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler()); + ch.pipeline().addLast(new StompClientHandler(index)); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java index 39cd4f7..fa8c048 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java @@ -63,12 +63,12 @@ public class StompOverWebsocketTest extends StompTest { } @Override - protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException { + protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException { ch.pipeline().addLast("http-codec", new HttpClientCodec()); ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192)); ch.pipeline().addLast(new WebsocketHandler(WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:8080/websocket"), WebSocketVersion.V13, null, false, null))); ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler()); + ch.pipeline().addLast(new StompClientHandler(index)); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 8155898..f28f5b2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -52,6 +52,24 @@ public class StompTest extends StompTestBase { private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER; @Test + public void testConnectionTTL() throws Exception { + int index = 1; + int port = 61614; + + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start(); + createBootstrap(index, port); + String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL; + sendFrame(index, frame); + frame = receiveFrame(index, 10000); + + Assert.assertTrue(frame.startsWith("CONNECTED")); + + Thread.sleep(5000); + + assertChannelClosed(index); + } + + @Test public void testSendManyMessages() throws Exception { MessageConsumer consumer = session.createConsumer(queue); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java index 9baf123..7f73e48 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java @@ -29,8 +29,10 @@ import java.io.IOException; import java.net.Socket; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -97,13 +99,15 @@ public abstract class StompTestBase extends ActiveMQTestBase { protected boolean autoCreateServer = true; - private Bootstrap bootstrap; + private List<Bootstrap> bootstraps = new ArrayList<>(); - private Channel channel; +// private Channel channel; - private BlockingQueue<String> priorityQueue; + private List<BlockingQueue<String>> priorityQueues = new ArrayList<>(); - private EventLoopGroup group; + private List<EventLoopGroup> groups = new ArrayList<>(); + + private List<Channel> channels = new ArrayList<>(); // Implementation methods // ------------------------------------------------------------------------- @@ -111,7 +115,6 @@ public abstract class StompTestBase extends ActiveMQTestBase { @Before public void setUp() throws Exception { super.setUp(); - priorityQueue = new ArrayBlockingQueue<>(1000); if (autoCreateServer) { server = createServer(); addServer(server.getActiveMQServer()); @@ -133,18 +136,27 @@ public abstract class StompTestBase extends ActiveMQTestBase { } private void createBootstrap() { - group = new NioEventLoopGroup(); - bootstrap = new Bootstrap(); - bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { + createBootstrap(0, port); + } + + protected void createBootstrap(int port) { + createBootstrap(0, port); + } + + protected void createBootstrap(final int index, int port) { + priorityQueues.add(index, new ArrayBlockingQueue<String>(1000)); + groups.add(index, new NioEventLoopGroup()); + bootstraps.add(index, new Bootstrap()); + bootstraps.get(index).group(groups.get(index)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { - addChannelHandlers(ch); + addChannelHandlers(index, ch); } }); // Start the client. try { - channel = bootstrap.connect("localhost", port).sync().channel(); + channels.add(index, bootstraps.get(index).connect("localhost", port).sync().channel()); handshake(); } catch (InterruptedException e) { @@ -156,10 +168,10 @@ public abstract class StompTestBase extends ActiveMQTestBase { protected void handshake() throws InterruptedException { } - protected void addChannelHandlers(SocketChannel ch) throws URISyntaxException { + protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException { ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8)); ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8)); - ch.pipeline().addLast(new StompClientHandler()); + ch.pipeline().addLast(new StompClientHandler(index)); } protected void setUpAfterServer() throws Exception { @@ -224,9 +236,13 @@ public abstract class StompTestBase extends ActiveMQTestBase { if (autoCreateServer) { connection.close(); - if (group != null) { - channel.close(); - group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS); + for (EventLoopGroup group : groups) { + if (group != null) { + for (Channel channel : channels) { + channel.close(); + } + group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS); + } } } super.tearDown(); @@ -234,8 +250,8 @@ public abstract class StompTestBase extends ActiveMQTestBase { protected void cleanUp() throws Exception { connection.close(); - if (group != null) { - group.shutdown(); + if (groups.get(0) != null) { + groups.get(0).shutdown(); } } @@ -244,7 +260,7 @@ public abstract class StompTestBase extends ActiveMQTestBase { } protected void reconnect(long sleep) throws Exception { - group.shutdown(); + groups.get(0).shutdown(); if (sleep > 0) { Thread.sleep(sleep); @@ -278,22 +294,38 @@ public abstract class StompTestBase extends ActiveMQTestBase { } protected void assertChannelClosed() throws InterruptedException { - boolean closed = channel.closeFuture().await(5000); + assertChannelClosed(0); + } + + protected void assertChannelClosed(int index) throws InterruptedException { + boolean closed = channels.get(index).closeFuture().await(5000); assertTrue("channel not closed", closed); } public void sendFrame(String data) throws Exception { - channel.writeAndFlush(data); + sendFrame(0, data); + } + + public void sendFrame(int index, String data) throws Exception { + channels.get(index).writeAndFlush(data); } public void sendFrame(byte[] data) throws Exception { + sendFrame(0, data); + } + + public void sendFrame(int index, byte[] data) throws Exception { ByteBuf buffer = Unpooled.buffer(data.length); buffer.writeBytes(data); - channel.writeAndFlush(buffer); + channels.get(index).writeAndFlush(buffer); } public String receiveFrame(long timeOut) throws Exception { - String msg = priorityQueue.poll(timeOut, TimeUnit.MILLISECONDS); + return receiveFrame(0, timeOut); + } + + public String receiveFrame(int index, long timeOut) throws Exception { + String msg = priorityQueues.get(index).poll(timeOut, TimeUnit.MILLISECONDS); return msg; } @@ -344,6 +376,11 @@ public abstract class StompTestBase extends ActiveMQTestBase { } class StompClientHandler extends SimpleChannelInboundHandler<String> { + int index = 0; + + StompClientHandler(int index) { + this.index = index; + } StringBuffer currentMessage = new StringBuffer(""); @@ -356,7 +393,12 @@ public abstract class StompTestBase extends ActiveMQTestBase { String actualMessage = fullMessage.substring(0, messageEnd); fullMessage = fullMessage.substring(messageEnd + 2); currentMessage = new StringBuffer(""); - priorityQueue.add(actualMessage); + BlockingQueue queue = priorityQueues.get(index); + if (queue == null) { + queue = new ArrayBlockingQueue(1000); + priorityQueues.add(index, queue); + } + queue.add(actualMessage); if (fullMessage.length() > 0) { channelRead(ctx, fullMessage); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java index 8e6fbb4..dfcd1b9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java @@ -741,6 +741,220 @@ public class StompV11Test extends StompV11TestBase { } @Test + public void testHeartBeatToTTL() throws Exception { + ClientStompFrame frame; + ClientStompFrame reply; + int port = 61614; + + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000&connectionTtlMin=5000&connectionTtlMax=10000").start(); + StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + + //no heart beat at all if heat-beat absent + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + Thread.sleep(3000); + + assertEquals(0, connection.getFrameQueueSize()); + + try { + connection.disconnect(); + fail("Channel should be closed here already due to TTL"); + } + catch (Exception e) { + // ignore + } + + //no heart beat for (0,0) + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "0,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + IntegrationTestLogger.LOGGER.info("Reply: " + reply); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,0", reply.getHeader("heart-beat")); + + Thread.sleep(3000); + + assertEquals(0, connection.getFrameQueueSize()); + + try { + connection.disconnect(); + fail("Channel should be closed here already due to TTL"); + } + catch (Exception e) { + // ignore + } + + //heart-beat (1,0), should receive a min client ping accepted by server + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "1,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,2500", reply.getHeader("heart-beat")); + + Thread.sleep(7000); + + //now server side should be disconnected because we didn't send ping for 2 sec + frame = connection.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("content-type", "text/plain"); + frame.setBody("Hello World"); + + //send will fail + try { + connection.sendFrame(frame); + fail("connection should have been destroyed by now"); + } + catch (IOException e) { + //ignore + } + + //heart-beat (1,0), start a ping, then send a message, should be ok. + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "1,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,2500", reply.getHeader("heart-beat")); + + System.out.println("========== start pinger!"); + + connection.startPinger(2500); + + Thread.sleep(7000); + + //now server side should be disconnected because we didn't send ping for 2 sec + frame = connection.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("content-type", "text/plain"); + frame.setBody("Hello World"); + + //send will be ok + connection.sendFrame(frame); + + connection.stopPinger(); + + connection.disconnect(); + + //heart-beat (20000,0), should receive a max client ping accepted by server + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "20000,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,5000", reply.getHeader("heart-beat")); + + Thread.sleep(12000); + + //now server side should be disconnected because we didn't send ping for 2 sec + frame = connection.createFrame("SEND"); + frame.addHeader("destination", getQueuePrefix() + getQueueName()); + frame.addHeader("content-type", "text/plain"); + frame.setBody("Hello World"); + + //send will fail + try { + connection.sendFrame(frame); + fail("connection should have been destroyed by now"); + } + catch (IOException e) { + //ignore + } + } + + @Test + public void testHeartBeatToConnectionTTLModifier() throws Exception { + ClientStompFrame frame; + ClientStompFrame reply; + StompClientConnection connection; + int port = 61614; + + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start(); + + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "5000,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,5000", reply.getHeader("heart-beat")); + + Thread.sleep(6000); + + try { + connection.disconnect(); + fail("Connection should be closed here already due to TTL"); + } + catch (Exception e) { + // ignore + } + + server.getActiveMQServer().getRemotingService().destroyAcceptor("test"); + server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start(); + + connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port); + frame = connection.createFrame("CONNECT"); + frame.addHeader("host", "127.0.0.1"); + frame.addHeader("login", this.defUser); + frame.addHeader("passcode", this.defPass); + frame.addHeader("heart-beat", "5000,0"); + frame.addHeader("accept-version", "1.0,1.1"); + + reply = connection.sendFrame(frame); + + assertEquals("CONNECTED", reply.getCommand()); + + assertEquals("0,5000", reply.getHeader("heart-beat")); + + Thread.sleep(6000); + + connection.disconnect(); + } + + @Test public void testNack() throws Exception { connV11.connect(defUser, defPass); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/89e0c461/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java index d37d90b..f737455 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11TestBase.java @@ -103,7 +103,7 @@ public abstract class StompV11TestBase extends ActiveMQTestBase { params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1"); TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); - Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())); + Configuration config = createBasicConfig().setPersistenceEnabled(persistenceEnabled).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())).setConnectionTtlCheckInterval(500); ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
