This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 1d1a943 ARTEMIS-2870: Transfer connection close/failure listeners one
by one on reattachment
new 3875533 This closes #3498
1d1a943 is described below
commit 1d1a9433bc3bde0d65159117312dae3ce053d25c
Author: Markus Meierhofer <[email protected]>
AuthorDate: Tue Mar 16 11:50:31 2021 +0100
ARTEMIS-2870: Transfer connection close/failure listeners one by one on
reattachment
Previously, when a session was reattached, all the close/failure listeners
were removed from the old connection and set onto the new connection.
This only worked when at most 1 session of the old connection was
transferred: When the second session was transferred, the old
connection already didn't contain any close/failure listeners anymore,
and therefore the list of close/failure listeners was overwritten by
an empty list for the new connection.
Now, when a session is being transferred, it only transfers the
close/failure listeners that belong to it, which are the session itself
+ the TempQueueCleanerUppers.
Modified a test to check whether the sessions are failure listeners of
the new connection after reattachment.
---
.../protocol/core/ServerSessionPacketHandler.java | 6 ----
.../core/server/impl/ServerSessionImpl.java | 21 +++++++++++-
.../integration/cluster/reattach/ReattachTest.java | 38 ++++++++++++++++++++++
3 files changed, 58 insertions(+), 7 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index d3b1909..5993c7a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -1004,9 +1004,6 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
// before we have transferred the connection, leaving it in a started
state
session.setTransferring(true);
- List<CloseListener> closeListeners =
remotingConnection.removeCloseListeners();
- List<FailureListener> failureListeners =
remotingConnection.removeFailureListeners();
-
// Note. We do not destroy the replicating connection here. In the case
the live server has really crashed
// then the connection will get cleaned up anyway when the server ping
timeout kicks in.
// In the case the live server is really still up, i.e. a split brain
situation (or in tests), then closing
@@ -1024,9 +1021,6 @@ public class ServerSessionPacketHandler implements
ChannelHandler {
remotingConnection = newConnection;
- remotingConnection.setCloseListeners(closeListeners);
- remotingConnection.setFailureListeners(failureListeners);
-
int serverLastReceivedCommandID = channel.getLastConfirmedCommandID();
channel.replayCommands(lastReceivedCommandID);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 05d9eb8..12a080e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1075,7 +1075,26 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
@Override
public void transferConnection(RemotingConnection newConnection) {
- remotingConnection = newConnection;
+ synchronized (this) {
+ // Remove failure listeners from old connection
+ remotingConnection.removeFailureListener(this);
+ tempQueueCleannerUppers.values()
+ .forEach(cleanerUpper -> {
+ remotingConnection.removeCloseListener(cleanerUpper);
+ remotingConnection.removeFailureListener(cleanerUpper);
+ });
+
+ // Set the new connection
+ remotingConnection = newConnection;
+
+ // Add failure listeners to new connection
+ newConnection.addFailureListener(this);
+ tempQueueCleannerUppers.values()
+ .forEach(cleanerUpper -> {
+ newConnection.addCloseListener(cleanerUpper);
+ newConnection.addFailureListener(cleanerUpper);
+ });
+ }
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
index 15bc091..b5a6236 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/reattach/ReattachTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.cluster.reattach;
import java.util.Objects;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
@@ -148,6 +149,43 @@ public class ReattachTest extends ActiveMQTestBase {
sf.close();
}
+ @Test
+ public void testReattachTransferConnectionOnSession2() throws Exception {
+ final long retryInterval = 50;
+ final double retryMultiplier = 1d;
+ final int reconnectAttempts = 10;
+
+
locator.setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(1024
* 1024);
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
createSessionFactory(locator);
+ ClientSession session = sf.createSession(false, true, true);
+ ClientSession secondSession = sf.createSession(false, true, true);
+
+ // there's only one connection on the broker
+ Object originalConnectionID = ((ServerSession)
server.getSessions().toArray()[0]).getConnectionID();
+ RemotingConnection oldConnection = ((ServerSession)
server.getSessions().toArray()[0]).getRemotingConnection();
+
+ // ensure sessions are set as failure listeners on old connection
+ Set<ServerSession> originalServerSessions = server.getSessions();
+
assertTrue(oldConnection.getFailureListeners().containsAll(originalServerSessions));
+
+ // trigger re-attach
+ ((ClientSessionInternal) session).getConnection().fail(new
ActiveMQNotConnectedException());
+
+ session.start();
+ secondSession.start();
+
+ assertFalse(Objects.equals(((ServerSession)
server.getSessions().toArray()[0]).getConnectionID(), originalConnectionID));
+
+ // ensure sessions were removed as failure listeners of old connection
and are now failure listeners of new connection
+
assertTrue(originalServerSessions.stream().noneMatch(oldConnection.getFailureListeners()::contains));
+ RemotingConnection newConnection = ((ServerSession)
server.getSessions().toArray()[0]).getRemotingConnection();
+
assertTrue(newConnection.getFailureListeners().containsAll(originalServerSessions));
+
+ session.close();
+ secondSession.close();
+ sf.close();
+ }
+
/*
* Test failure on connection, but server is still up so should immediately
reconnect
*/