Repository: activemq-artemis
Updated Branches:
  refs/heads/master b3e1bec7e -> ca41de32e


ARTEMIS-1768: Update to fix handling of internalSession for OpenWireConnection


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/faec4353
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/faec4353
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/faec4353

Branch: refs/heads/master
Commit: faec43530dace6623a49179e7e0ed2f02a2f432a
Parents: b3e1bec
Author: Benjamin Graf <[email protected]>
Authored: Fri May 4 22:41:39 2018 +0200
Committer: Benjamin Graf <[email protected]>
Committed: Sat May 5 16:35:39 2018 +0200

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 21 ++++++++++++++------
 .../openwire/OpenWireProtocolManager.java       |  6 +++---
 2 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faec4353/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e4d0a19..6a10de7 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -82,6 +82,7 @@ import 
org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
 import 
org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -177,11 +178,17 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    private final ActiveMQServer server;
 
    /**
-    * This is to be used with connection operations that don't have  a session.
+    * This is to be used with connection operations that don't have a session.
     * Such as TM operations.
     */
    private ServerSession internalSession;
 
+   /**
+    * Used for proper closing of internal sessions like OpenWire advisory
+    * session at disconnect.
+    */
+   private final Set<SessionId> internalSessionIds = new ConcurrentHashSet<>();
+
    private final OperationContext operationContext;
 
    private static final AtomicLongFieldUpdater<OpenWireConnection> 
LAST_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(OpenWireConnection.class, 
"lastSent");
@@ -609,6 +616,9 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       state.shutdown();
 
       try {
+         for (SessionId sessionId : internalSessionIds) {
+            sessions.get(sessionId).close();
+         }
          internalSession.close(false);
       } catch (Exception e) {
          ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
@@ -724,15 +734,13 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
          info.setClientIp(getRemoteAddress());
       }
 
-      createInternalSession();
+      createInternalSession(info);
 
       return context;
    }
 
-   private void createInternalSession() throws Exception {
-      SessionInfo sessionInfo = 
getState().getSessionStates().iterator().next().getInfo();
-      AMQSession session = addSession(sessionInfo, true);
-      internalSession = session.getCoreSession();
+   private void createInternalSession(ConnectionInfo info) throws Exception {
+      internalSession = 
server.createSession(UUIDGenerator.getInstance().generateStringUUID(), 
context.getUserName(), info.getPassword(), -1, this, true, false, false, false, 
null, null, true, operationContext, protocolManager.getPrefixes());
    }
 
    //raise the refCount of context
@@ -983,6 +991,7 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    public void addSessions(Set<SessionId> sessionSet) {
       for (SessionId sid : sessionSet) {
          addSession(getState().getSessionState(sid).getInfo(), true);
+         internalSessionIds.add(sid);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/faec4353/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index e954937..95a400e 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -320,9 +320,6 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
                oldConnection.disconnect(true);
                connections.remove(oldConnection);
                connection.reconnect(context, info);
-
-               // init the conn after reconnect
-               
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
             } else {
                throw new InvalidClientIDException("Broker: " + getBrokerName() 
+ " - Client: " + clientId + " already connected from " + 
context.getConnection().getRemoteAddress());
             }
@@ -339,6 +336,9 @@ public class OpenWireProtocolManager implements 
ProtocolManager<Interceptor>, Cl
          ConnectionInfo copy = info.copy();
          copy.setPassword("");
          fireAdvisory(context, topic, copy);
+
+         // init the conn
+         
context.getConnection().addSessions(context.getConnectionState().getSessionIds());
       }
    }
 

Reply via email to