Repository: activemq-artemis
Updated Branches:
  refs/heads/master 4dd116ee0 -> 22b62b5b0


ARTEMIS-2029 Fixing wire checks after reconnects


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/87fdff51
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/87fdff51
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/87fdff51

Branch: refs/heads/master
Commit: 87fdff51e1178c86f8c3a3bfe9166728fe41e16f
Parents: 4dd116e
Author: Clebert Suconic <[email protected]>
Authored: Mon Aug 13 11:49:19 2018 -0400
Committer: Clebert Suconic <[email protected]>
Committed: Mon Aug 13 18:24:09 2018 -0400

----------------------------------------------------------------------
 .../client/impl/ClientSessionFactoryImpl.java   | 10 ++-
 .../main/resources/clients/artemisClient.groovy |  4 +-
 .../main/resources/clients/artemisFail.groovy   | 41 +++++++++++
 .../main/resources/meshTest/sendMessages.groovy | 74 +++++++++++++++++++-
 .../main/resources/metrics/queueMetrics.groovy  |  4 +-
 5 files changed, 124 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/87fdff51/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 81b6c44..4723c88 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -237,7 +237,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
    public void connect(final int initialConnectAttempts,
                        final boolean failoverOnInitialConnection) throws 
ActiveMQException {
       // Get the connection
-      getConnectionWithRetry(initialConnectAttempts);
+      getConnectionWithRetry(initialConnectAttempts, null);
 
       if (connection == null) {
          StringBuilder msg = new StringBuilder("Unable to connect to server 
using configuration ").append(currentConnectorConfig);
@@ -743,7 +743,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
          session.preHandleFailover(connection);
       }
 
-      getConnectionWithRetry(reconnectAttempts);
+      getConnectionWithRetry(reconnectAttempts, oldConnection);
 
       if (connection == null) {
          if (!clientProtocolManager.isAlive())
@@ -774,7 +774,7 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
       }
    }
 
-   private void getConnectionWithRetry(final int reconnectAttempts) {
+   private void getConnectionWithRetry(final int reconnectAttempts, 
RemotingConnection oldConnection) {
       if (!clientProtocolManager.isAlive())
          return;
       if (logger.isTraceEnabled()) {
@@ -795,6 +795,10 @@ public class ClientSessionFactoryImpl implements 
ClientSessionFactoryInternal, C
          }
 
          if (getConnection() != null) {
+            if (oldConnection != null && oldConnection instanceof 
CoreRemotingConnection) {
+               // transferring old connection version into the new connection
+               
((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());
+            }
             if (logger.isDebugEnabled()) {
                logger.debug("Reconnection successful");
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/87fdff51/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy
----------------------------------------------------------------------
diff --git 
a/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy 
b/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy
index 54cd10a..eb9137f 100644
--- a/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy
+++ b/tests/compatibility-tests/src/main/resources/clients/artemisClient.groovy
@@ -22,9 +22,9 @@ import 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
 import org.apache.activemq.artemis.tests.compatibility.GroovyRun;
 
 if (serverArg[0].startsWith("HORNETQ")) {
-    cf = new 
ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false");
+    cf = new 
ActiveMQConnectionFactory("tcp://localhost:61616?protocolManagerFactoryStr=org.apache.activemq.artemis.core.protocol.hornetq.client.HornetQClientProtocolManagerFactory&confirmationWindowSize=1048576&blockOnDurableSend=false&reconnectAttempts=-1&retryInterval=100");
 } else {
-    cf = new 
ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false");
+    cf = new 
ActiveMQConnectionFactory("tcp://localhost:61616?confirmationWindowSize=1048576&blockOnDurableSend=false&ha=true&reconnectAttempts=-1&retryInterval=100");
 }
 
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/87fdff51/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy
----------------------------------------------------------------------
diff --git 
a/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy 
b/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy
new file mode 100644
index 0000000..100a6e9
--- /dev/null
+++ b/tests/compatibility-tests/src/main/resources/clients/artemisFail.groovy
@@ -0,0 +1,41 @@
+package clients
+
+import org.apache.activemq.artemis.api.core.ActiveMQException
+import org.apache.activemq.artemis.api.core.client.FailoverEventListener
+import org.apache.activemq.artemis.api.core.client.FailoverEventType
+import org.apache.activemq.artemis.jms.client.ActiveMQConnection
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Create a client connection factory
+
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
+import org.apache.activemq.artemis.tests.compatibility.GroovyRun
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit;
+
+CountDownLatch latch = new CountDownLatch(1);
+((ActiveMQConnection)connectionToFail).setFailoverListener(new 
FailoverEventListener() {
+    @Override
+    void failoverEvent(FailoverEventType eventType) {
+        latch.countDown();
+    }
+})
+((ActiveMQConnection)connectionToFail).getSessionFactory().getConnection().fail(new
 ActiveMQException("fail"));
+GroovyRun.assertTrue(latch.await(10, TimeUnit.SECONDS));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/87fdff51/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy
----------------------------------------------------------------------
diff --git 
a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy 
b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy
index 6a5e370..87e8027 100644
--- a/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy
+++ b/tests/compatibility-tests/src/main/resources/meshTest/sendMessages.groovy
@@ -26,7 +26,6 @@ String serverType = arg[0];
 String clientType = arg[1];
 String operation = arg[2];
 
-
 try {
     legacyOption = legacy;
 } catch (Throwable e) {
@@ -127,8 +126,60 @@ if (operation.equals("sendAckMessages") || 
operation.equals("sendTopic")) {
     plain.setStringProperty("plain", "doce");
     plain.setIntProperty("order", 15)
     producer.send(plain);
-
     session.commit();
+    session.close();
+
+    Session newSession = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+    connectionToFail = connection;
+    if (clientType.equals("ARTEMIS-SNAPSHOT")) {
+        // this is validating a bug that could only be fixed in snapshot
+        GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", 
serverType);
+    }
+    MessageProducer newProducer = newSession.createProducer(destination);
+    for (int i = 0 ; i < 10; i++) {
+        String bodyText = "This is message " + i;
+        TextMessage textMessage = newSession.createTextMessage(bodyText);
+        int size = 5 + i % 10;
+        StringBuffer variableSize = new StringBuffer();
+        for (int s = 0; s < size; s++) {
+            variableSize.append(" " + i);
+        }
+        textMessage.setStringProperty("inMessageId", variableSize.toString());
+        newProducer.send(textMessage);
+        newSession.commit();
+
+        newSession.close();
+        newSession = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+        newProducer = newSession.createProducer(destination);
+        if (i % 2 == 0) {
+            // failing half of the sessions for the snapshots
+            if (clientType.equals("ARTEMIS-SNAPSHOT")) {
+                // this is validating a bug that could only be fixed in 
snapshot
+                GroovyRun.evaluate("clients/artemisFail.groovy", "serverArg", 
serverType);
+            }
+        }
+
+    }
+
+    // even if topic, will send a few on queue
+    newProducer = newSession.createProducer(queue);
+
+    for (int i = 0; i < 7; i++) {
+        String bodyText = "This is message " + i;
+        TextMessage textMessage = newSession.createTextMessage(bodyText);
+        int size = 5 + i % 10;
+        StringBuffer variableSize = new StringBuffer();
+        for (int s = 0; s < size; s++) {
+            variableSize.append(" " + i);
+        }
+        textMessage.setStringProperty("inMessageId", variableSize.toString());
+        newProducer.send(textMessage);
+        newSession.commit();
+   }
+
+    newSession.commit();
+    newSession.close();
+
 
     connection.close();
 }
@@ -194,7 +245,26 @@ if (operation.equals("receiveMessages") || 
operation.equals("receiveNonDurableSu
     GroovyRun.assertNotNull(plain);
     GroovyRun.assertEquals("doce", plain.getStringProperty("plain"));
 
+
+    for (int i = 0 ; i < 10; i++) {
+        TextMessage recMessage = consumer.receive(5000);
+        GroovyRun.assertNotNull(recMessage);
+        GroovyRun.assertEquals("This is message " + i, recMessage.getText());
+    }
+
     session.commit();
+
+    consumer.close();
+
+    // force a few on the queue even if the test is for topics
+    consumer = session.createConsumer(queue);
+
+    for (int i = 0; i < 7; i++) {
+        TextMessage recMessage = consumer.receive(5000);
+        GroovyRun.assertNotNull(recMessage);
+        GroovyRun.assertEquals("This is message " + i, recMessage.getText());
+    }
+
     connection.close();
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/87fdff51/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
----------------------------------------------------------------------
diff --git 
a/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy 
b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
index 4ef4425..9ac8985 100644
--- a/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
+++ b/tests/compatibility-tests/src/main/resources/metrics/queueMetrics.groovy
@@ -32,6 +32,6 @@ for (Object o : queueControls) {
     QueueControl c = (QueueControl) o;
     GroovyRun.assertTrue(c.getPersistentSize() > 0);
     GroovyRun.assertTrue(c.getDurablePersistentSize() > 0);
-    GroovyRun.assertEquals(16l, c.getMessageCount());
-    GroovyRun.assertEquals(16l, c.getDurableMessageCount());
+    GroovyRun.assertEquals(33l, c.getMessageCount());
+    GroovyRun.assertEquals(33l, c.getDurableMessageCount());
  }

Reply via email to