Repository: activemq-artemis Updated Branches: refs/heads/2.6.x 2d1714383 -> 80f12e7f6
ARTEMIS-2029 Fixing wire checks after reconnects (cherry picked from commit 87fdff51e1178c86f8c3a3bfe9166728fe41e16f) Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/80f12e7f Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/80f12e7f Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/80f12e7f Branch: refs/heads/2.6.x Commit: 80f12e7f68eed7bad46bb4c9653feb1f8ef5d318 Parents: 2d17143 Author: Clebert Suconic <[email protected]> Authored: Mon Aug 13 11:49:19 2018 -0400 Committer: Clebert Suconic <[email protected]> Committed: Mon Aug 13 18:24:25 2018 -0400 ---------------------------------------------------------------------- .../client/impl/ClientSessionFactoryImpl.java | 10 ++- .../main/resources/clients/artemisClient.groovy | 4 +- .../main/resources/clients/artemisFail.groovy | 41 +++++++++++ .../main/resources/meshTest/sendMessages.groovy | 74 +++++++++++++++++++- .../main/resources/metrics/queueMetrics.groovy | 4 +- 5 files changed, 124 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/80f12e7f/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java index 81b6c44..4723c88 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java @@ -237,7 +237,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws ActiveMQException { // Get the connection - getConnectionWithRetry(initialConnectAttempts); + getConnectionWithRetry(initialConnectAttempts, null); if (connection == null) { StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig); @@ -743,7 +743,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C session.preHandleFailover(connection); } - getConnectionWithRetry(reconnectAttempts); + getConnectionWithRetry(reconnectAttempts, oldConnection); if (connection == null) { if (!clientProtocolManager.isAlive()) @@ -774,7 +774,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } } - private void getConnectionWithRetry(final int reconnectAttempts) { + private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) { if (!clientProtocolManager.isAlive()) return; if (logger.isTraceEnabled()) { @@ -795,6 +795,10 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C } if (getConnection() != null) { + if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) { + // transferring old connection version into the new connection + ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion()); + } if (logger.isDebugEnabled()) { logger.debug("Reconnection successful"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/80f12e7f/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy index 54cd10a..eb9137f 100644 --- a/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy +++ b/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy @@ -22,9 +22,9 @@ import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory import org.apache.activemq.artemis.tests.compatibility.GroovyRun; if (serverArg[0].startsWith("HORNETQ")) { - cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false"); + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false&reconnectAttempts=-1&retryInterval=100"); } else { - cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false"); + cf = new ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100"); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/80f12e7f/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy b/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy new file mode 100644 index 0000000..100a6e9 --- /dev/null +++ b/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy @@ -0,0 +1,41 @@ +package clients + +import org.apache.activemq.artemis.api.core.ActiveMQException +import org.apache.activemq.artemis.api.core.client.FailoverEventListener +import org.apache.activemq.artemis.api.core.client.FailoverEventType +import org.apache.activemq.artemis.jms.client.ActiveMQConnection + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Create a client connection factory + +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory +import org.apache.activemq.artemis.tests.compatibility.GroovyRun + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit; + +CountDownLatch latch = new CountDownLatch(1); +((ActiveMQConnection)connectionToFail).setFailoverListener(new FailoverEventListener() { + @Override + void failoverEvent(FailoverEventType eventType) { + latch.countDown(); + } +}) +((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new ActiveMQException("fail")); +GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS)); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/80f12e7f/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy index 6a5e370..87e8027 100644 --- a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy +++ b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy @@ -26,7 +26,6 @@ String serverType = arg[0]; String clientType = arg[1]; String operation = arg[2]; - try { legacyOption = legacy; } catch (Throwable e) { @@ -127,8 +126,60 @@ if (operation.equals("sendAckMessages") || operation.equals("sendTopic")) { plain.setStringProperty("plain", "doce"); plain.setIntProperty("order", 15) producer.send(plain); - session.commit(); + session.close(); + + Session newSession = connection.createSession(true, Session.SESSION_TRANSACTED); + connectionToFail = connection; + if (clientType.equals("ARTEMIS-SNAPSHOT")) { + // this is validating a bug that could only be fixed in snapshot + GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType); + } + MessageProducer newProducer = newSession.createProducer(destination); + for (int i = 0 ; i < 10; i++) { + String bodyText = "This is message " + i; + TextMessage textMessage = newSession.createTextMessage(bodyText); + int size = 5 + i % 10; + StringBuffer variableSize = new StringBuffer(); + for (int s = 0; s < size; s++) { + variableSize.append(" " + i); + } + textMessage.setStringProperty("inMessageId", variableSize.toString()); + newProducer.send(textMessage); + newSession.commit(); + + newSession.close(); + newSession = connection.createSession(true, Session.SESSION_TRANSACTED); + newProducer = newSession.createProducer(destination); + if (i % 2 == 0) { + // failing half of the sessions for the snapshots + if (clientType.equals("ARTEMIS-SNAPSHOT")) { + // this is validating a bug that could only be fixed in snapshot + GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", serverType); + } + } + + } + + // even if topic, will send a few on queue + newProducer = newSession.createProducer(queue); + + for (int i = 0; i < 7; i++) { + String bodyText = "This is message " + i; + TextMessage textMessage = newSession.createTextMessage(bodyText); + int size = 5 + i % 10; + StringBuffer variableSize = new StringBuffer(); + for (int s = 0; s < size; s++) { + variableSize.append(" " + i); + } + textMessage.setStringProperty("inMessageId", variableSize.toString()); + newProducer.send(textMessage); + newSession.commit(); + } + + newSession.commit(); + newSession.close(); + connection.close(); } @@ -194,7 +245,26 @@ if (operation.equals("receiveMessages") || operation.equals("receiveNonDurableSu GroovyRun.assertNotNull(plain); GroovyRun.assertEquals("doce", plain.getStringProperty("plain")); + + for (int i = 0 ; i < 10; i++) { + TextMessage recMessage = consumer.receive(5000); + GroovyRun.assertNotNull(recMessage); + GroovyRun.assertEquals("This is message " + i, recMessage.getText()); + } + session.commit(); + + consumer.close(); + + // force a few on the queue even if the test is for topics + consumer = session.createConsumer(queue); + + for (int i = 0; i < 7; i++) { + TextMessage recMessage = consumer.receive(5000); + GroovyRun.assertNotNull(recMessage); + GroovyRun.assertEquals("This is message " + i, recMessage.getText()); + } + connection.close(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/80f12e7f/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy ---------------------------------------------------------------------- diff --git a/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy index 4ef4425..9ac8985 100644 --- a/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy +++ b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy @@ -32,6 +32,6 @@ for (Object o : queueControls) { QueueControl c = (QueueControl) o; GroovyRun.assertTrue(c.getPersistentSize() > 0); GroovyRun.assertTrue(c.getDurablePersistentSize() > 0); - GroovyRun.assertEquals(16l, c.getMessageCount()); - GroovyRun.assertEquals(16l, c.getDurableMessageCount()); + GroovyRun.assertEquals(33l, c.getMessageCount()); + GroovyRun.assertEquals(33l, c.getDurableMessageCount()); }
