This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new c1dcd4b ARTEMIS-2259 Client session not exist if reattach on new
connection timeout
new 1c637c1 This closes #2561
c1dcd4b is described below
commit c1dcd4bec923fdbe72767de9e1575c59627dff64
Author: yang wei <[email protected]>
AuthorDate: Fri Feb 22 20:25:19 2019 +0800
ARTEMIS-2259 Client session not exist if reattach on new connection timeout
---
.../core/client/impl/ClientSessionFactoryImpl.java | 6 ++-
.../core/client/impl/ClientSessionImpl.java | 9 +++-
.../core/client/impl/ClientSessionInternal.java | 2 +-
.../tests/integration/remoting/ReconnectTest.java | 48 ++++++++++++++++++++++
4 files changed, 61 insertions(+), 4 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index b135677..47237b5 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -771,7 +771,11 @@ public class ClientSessionFactoryImpl implements
ClientSessionFactoryInternal, C
((CoreRemotingConnection)
connection).syncIDGeneratorSequence(((CoreRemotingConnection)
oldConnection).getIDGeneratorSequence());
for (ClientSessionInternal session : sessionsToFailover) {
- session.handleFailover(connection, cause);
+ if (!session.handleFailover(connection, cause)) {
+ connection.destroy();
+ this.connection = null;
+ return;
+ }
}
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
index 55054c4..766ca91 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java
@@ -1350,10 +1350,12 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
// Needs to be synchronized to prevent issues with occurring concurrently
with close()
@Override
- public void handleFailover(final RemotingConnection backupConnection,
ActiveMQException cause) {
+ public boolean handleFailover(final RemotingConnection backupConnection,
ActiveMQException cause) {
+ boolean suc = true;
+
synchronized (this) {
if (closed) {
- return;
+ return true;
}
boolean resetCreditManager = false;
@@ -1426,6 +1428,7 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
}
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);
+ suc = false;
} finally {
sessionContext.releaseCommunications();
}
@@ -1448,6 +1451,8 @@ public final class ClientSessionImpl implements
ClientSessionInternal, FailureLi
sessionContext.resetMetadata(metaDataToSend);
+ return suc;
+
}
@Override
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
index 3c6829a..a3700b2 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionInternal.java
@@ -66,7 +66,7 @@ public interface ClientSessionInternal extends ClientSession {
void preHandleFailover(RemotingConnection connection);
- void handleFailover(RemotingConnection backupConnection, ActiveMQException
cause);
+ boolean handleFailover(RemotingConnection backupConnection,
ActiveMQException cause);
RemotingConnection getConnection();
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
index c2c9f61..6eadf4f 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
@@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.FailoverEventListener;
import org.apache.activemq.artemis.api.core.client.FailoverEventType;
@@ -31,8 +34,11 @@ import
org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import
org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
@@ -318,6 +324,48 @@ public class ReconnectTest extends ActiveMQTestBase {
}
+ @Test
+ public void testReattachTimeout() throws Exception {
+ ActiveMQServer server = createServer(true, true);
+ server.start();
+ // imitate session reattach timeout
+ Interceptor reattachInterceptor = new Interceptor() {
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection
connection) throws ActiveMQException {
+ if (packet.getType() == PacketImpl.REATTACH_SESSION) {
+ return false;
+ } else {
+ return true;
+ }
+
+ }
+ };
+ server.getRemotingService().addIncomingInterceptor(reattachInterceptor);
+
+ final long retryInterval = 50;
+ final double retryMultiplier = 1d;
+ final int reconnectAttempts = 10;
+ ServerLocator locator =
createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
+ ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal)
createSessionFactory(locator);
+ final CountDownLatch latch = new CountDownLatch(1);
+ sf.addFailoverListener(eventType -> {
+ if (eventType == FailoverEventType.FAILOVER_FAILED) {
+ latch.countDown();
+ }
+ });
+
+ ClientSession session = sf.createSession(false, true, true);
+ RemotingConnection conn = ((ClientSessionInternal)
session).getConnection();
+ conn.fail(new ActiveMQNotConnectedException());
+
+ assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
+ assertTrue(session.isClosed());
+
+ session.close();
+ sf.close();
+ server.stop();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------