This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d1a943  ARTEMIS-2870: Transfer connection close/failure listeners one 
by one on reattachment
     new 3875533  This closes #3498
1d1a943 is described below

commit 1d1a9433bc3bde0d65159117312dae3ce053d25c
Author: Markus Meierhofer <[email protected]>
AuthorDate: Tue Mar 16 11:50:31 2021 +0100

    ARTEMIS-2870: Transfer connection close/failure listeners one by one on 
reattachment
    
    Previously, when a session was reattached, all the close/failure listeners
    were removed from the old connection and set onto the new connection.
    This only worked when at most 1 session of the old connection was
    transferred: When the second session was transferred, the old
    connection already didn't contain any close/failure listeners anymore,
    and therefore the list of close/failure listeners was overwritten by
    an empty list for the new connection.
    
    Now, when a session is being transferred, it only transfers the
    close/failure listeners that belong to it, which are the session itself
    + the TempQueueCleanerUppers.
    
    Modified a test to check whether the sessions are failure listeners of
    the new connection after reattachment.
---
 .../protocol/core/ServerSessionPacketHandler.java  |  6 ----
 .../core/server/impl/ServerSessionImpl.java        | 21 +++++++++++-
 .../integration/cluster/reattach/ReattachTest.java | 38 ++++++++++++++++++++++
 3 files changed, 58 insertions(+), 7 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index d3b1909..5993c7a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -1004,9 +1004,6 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
       // before we have transferred the connection, leaving it in a started 
state
       session.setTransferring(true);
 
-      List<CloseListener> closeListeners = 
remotingConnection.removeCloseListeners();
-      List<FailureListener> failureListeners = 
remotingConnection.removeFailureListeners();
-
       // Note. We do not destroy the replicating connection here. In the case 
the live server has really crashed
       // then the connection will get cleaned up anyway when the server ping 
timeout kicks in.
       // In the case the live server is really still up, i.e. a split brain 
situation (or in tests), then closing
@@ -1024,9 +1021,6 @@ public class ServerSessionPacketHandler implements 
ChannelHandler {
 
       remotingConnection = newConnection;
 
-      remotingConnection.setCloseListeners(closeListeners);
-      remotingConnection.setFailureListeners(failureListeners);
-
       int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
 
       channel.replayCommands(lastReceivedCommandID);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 05d9eb8..12a080e 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1075,7 +1075,26 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    @Override
    public void transferConnection(RemotingConnection newConnection) {
-      remotingConnection = newConnection;
+      synchronized (this) {
+         // Remove failure listeners from old connection
+         remotingConnection.removeFailureListener(this);
+         tempQueueCleannerUppers.values()
+                 .forEach(cleanerUpper -> {
+                    remotingConnection.removeCloseListener(cleanerUpper);
+                    remotingConnection.removeFailureListener(cleanerUpper);
+                 });
+
+         // Set the new connection
+         remotingConnection = newConnection;
+
+         // Add failure listeners to new connection
+         newConnection.addFailureListener(this);
+         tempQueueCleannerUppers.values()
+                 .forEach(cleanerUpper -> {
+                    newConnection.addCloseListener(cleanerUpper);
+                    newConnection.addFailureListener(cleanerUpper);
+                 });
+      }
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
index 15bc091..b5a6236 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.cluster.reattach;
 
 import java.util.Objects;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
@@ -148,6 +149,43 @@ public class ReattachTest extends ActiveMQTestBase {
       sf.close();
    }
 
+   @Test
+   public void testReattachTransferConnectionOnSession2() throws Exception {
+      final long retryInterval = 50;
+      final double retryMultiplier = 1d;
+      final int reconnectAttempts = 10;
+
+      
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024
 * 1024);
+      ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) 
createSessionFactory(locator);
+      ClientSession session = sf.createSession(false, true, true);
+      ClientSession secondSession = sf.createSession(false, true, true);
+
+      // there's only one connection on the broker
+      Object originalConnectionID = ((ServerSession) 
server.getSessions().toArray()[0]).getConnectionID();
+      RemotingConnection oldConnection = ((ServerSession) 
server.getSessions().toArray()[0]).getRemotingConnection();
+
+      // ensure sessions are set as failure listeners on old connection
+      Set<ServerSession> originalServerSessions = server.getSessions();
+      
assertTrue(oldConnection.getFailureListeners().containsAll(originalServerSessions));
+
+      // trigger re-attach
+      ((ClientSessionInternal) session).getConnection().fail(new 
ActiveMQNotConnectedException());
+
+      session.start();
+      secondSession.start();
+
+      assertFalse(Objects.equals(((ServerSession) 
server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID));
+
+      // ensure sessions were removed as failure listeners of old connection 
and are now failure listeners of new connection
+      
assertTrue(originalServerSessions.stream().noneMatch(oldConnection.getFailureListeners()::contains));
+      RemotingConnection newConnection = ((ServerSession) 
server.getSessions().toArray()[0]).getRemotingConnection();
+      
assertTrue(newConnection.getFailureListeners().containsAll(originalServerSessions));
+
+      session.close();
+      secondSession.close();
+      sf.close();
+   }
+
    /*
     * Test failure on connection, but server is still up so should immediately 
reconnect
     */

Reply via email to