Repository: activemq-artemis Updated Branches: refs/heads/master b3e1bec7e -> ca41de32e
ARTEMIS-1768: Update to fix handling of internalSession for OpenWireConnection Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/faec4353 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/faec4353 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/faec4353 Branch: refs/heads/master Commit: faec43530dace6623a49179e7e0ed2f02a2f432a Parents: b3e1bec Author: Benjamin Graf <[email protected]> Authored: Fri May 4 22:41:39 2018 +0200 Committer: Benjamin Graf <[email protected]> Committed: Sat May 5 16:35:39 2018 +0200 ---------------------------------------------------------------------- .../protocol/openwire/OpenWireConnection.java | 21 ++++++++++++++------ .../openwire/OpenWireProtocolManager.java | 6 +++--- 2 files changed, 18 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faec4353/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index e4d0a19..6a10de7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -82,6 +82,7 @@ import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes; import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; @@ -177,11 +178,17 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private final ActiveMQServer server; /** - * This is to be used with connection operations that don't have a session. + * This is to be used with connection operations that don't have a session. * Such as TM operations. */ private ServerSession internalSession; + /** + * Used for proper closing of internal sessions like OpenWire advisory + * session at disconnect. + */ + private final Set<SessionId> internalSessionIds = new ConcurrentHashSet<>(); + private final OperationContext operationContext; private static final AtomicLongFieldUpdater<OpenWireConnection> LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, "lastSent"); @@ -609,6 +616,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se state.shutdown(); try { + for (SessionId sessionId : internalSessionIds) { + sessions.get(sessionId).close(); + } internalSession.close(false); } catch (Exception e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); @@ -724,15 +734,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se info.setClientIp(getRemoteAddress()); } - createInternalSession(); + createInternalSession(info); return context; } - private void createInternalSession() throws Exception { - SessionInfo sessionInfo = getState().getSessionStates().iterator().next().getInfo(); - AMQSession session = addSession(sessionInfo, true); - internalSession = session.getCoreSession(); + private void createInternalSession(ConnectionInfo info) throws Exception { + internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes()); } //raise the refCount of context @@ -983,6 +991,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se public void addSessions(Set<SessionId> sessionSet) { for (SessionId sid : sessionSet) { addSession(getState().getSessionState(sid).getInfo(), true); + internalSessionIds.add(sid); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faec4353/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index e954937..95a400e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -320,9 +320,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl oldConnection.disconnect(true); connections.remove(oldConnection); connection.reconnect(context, info); - - // init the conn after reconnect - context.getConnection().addSessions(context.getConnectionState().getSessionIds()); } else { throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " + context.getConnection().getRemoteAddress()); } @@ -339,6 +336,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl ConnectionInfo copy = info.copy(); copy.setPassword(""); fireAdvisory(context, topic, copy); + + // init the conn + context.getConnection().addSessions(context.getConnectionState().getSessionIds()); } }
