This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new a68c661  NO-JIRA Fix test 
ReplicatedAsynchronousFailoverTest.testTransactional
     new 4af2fe4  This closes #2601
a68c661 is described below

commit a68c661944bccad4296db4b1bb85a39472e0320e
Author: Howard Gao <howard....@gmail.com>
AuthorDate: Tue Apr 2 12:58:32 2019 +0800

    NO-JIRA Fix test ReplicatedAsynchronousFailoverTest.testTransactional
    
    Fix random failures.
---
 .../cluster/failover/AsynchronousFailoverTest.java | 64 +++++++++++++++++++---
 .../cluster/util/SameProcessActiveMQServer.java    | 15 ++++-
 2 files changed, 69 insertions(+), 10 deletions(-)

diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java
index cc8a021..82b6286 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/AsynchronousFailoverTest.java
@@ -20,8 +20,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import 
org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException;
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQTransactionOutcomeUnknownException;
 import 
org.apache.activemq.artemis.api.core.ActiveMQTransactionRolledBackException;
 import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException;
@@ -137,7 +139,9 @@ public class AsynchronousFailoverTest extends 
FailoverTestBase {
       try {
          for (int i = 0; i < numIts; i++) {
             AsynchronousFailoverTest.log.info("Iteration " + i);
-            ServerLocator locator = 
getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(15).setConfirmationWindowSize(10
 * 1024 * 1024);
+            //set block timeout to 10 sec to reduce test time.
+            ServerLocator locator = 
getServerLocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setReconnectAttempts(30).setRetryInterval(100).setConfirmationWindowSize(10
 * 1024 * 1024).setCallTimeout(10000).setCallFailoverTimeout(10000);
+
             sf = createSessionFactoryAndWaitForTopology(locator, 2);
             try {
 
@@ -180,7 +184,7 @@ public class AsynchronousFailoverTest extends 
FailoverTestBase {
 
                AsynchronousFailoverTest.log.info("Fail complete");
 
-               t.join(TimeUnit.SECONDS.toMillis(60));
+               t.join(TimeUnit.SECONDS.toMillis(120));
                if (t.isAlive()) {
                   System.out.println(threadDump("Thread still running from the 
test"));
                   t.interrupt();
@@ -331,12 +335,29 @@ public class AsynchronousFailoverTest extends 
FailoverTestBase {
 
             final int numMessages = 1000;
 
-            session = sf.createSession(false, false);
+            int retryCreateSession = 4;
+            //session creation may fail in the middle of failover
+            while (session == null) {
+               try {
+                  //if autoCommitSends is false, send will be non-blocking
+                  session = sf.createSession(true, false);
+               } catch (ActiveMQException e) {
+                  if (retryCreateSession == 0) {
+                     throw e;
+                  }
+                  retryCreateSession--;
+                  Thread.sleep(2000);
+               }
+            }
 
             listener = new CountDownSessionFailureListener(session);
             session.addFailureListener(listener);
 
             do {
+               if (runner.isFailed()) {
+                  //test ends, return
+                  return;
+               }
                try {
                   ClientProducer producer = 
session.createProducer(FailoverTestBase.ADDRESS);
 
@@ -379,6 +400,16 @@ public class AsynchronousFailoverTest extends 
FailoverTestBase {
                   log.info("#test transaction rollback retrying on sending");
                   // OK
                   retry = true;
+               } catch (ActiveMQObjectClosedException closedException) {
+                  log.info("#test producer closed, retrying on sending...");
+                  Thread.sleep(2000);
+                  // OK
+                  retry = true;
+               } catch (ActiveMQConnectionTimedOutException timedoutEx) {
+                  //commit timedout because of server crash. retry
+                  //will be ok after failover
+                  Thread.sleep(2000);
+                  retry = true;
                } catch (ActiveMQException e) {
                   log.info("#test Exception " + e, e);
                   throw e;
@@ -394,18 +425,32 @@ public class AsynchronousFailoverTest extends 
FailoverTestBase {
 
             ClientConsumer consumer = null;
             do {
+               if (runner.isFailed()) {
+                  //test ends, return
+                  return;
+               }
                ArrayList<Integer> msgs = new ArrayList<>();
                try {
-                  if (consumer == null) {
-                     consumer = 
session.createConsumer(FailoverTestBase.ADDRESS);
-                     session.start();
+                  int retryCreate = 4;
+                  while (consumer == null) {
+                     try {
+                        consumer = 
session.createConsumer(FailoverTestBase.ADDRESS);
+                     } catch (ActiveMQObjectClosedException closedEx) {
+                        //the session may just crashed and failover not done 
yet
+                        if (retryCreate == 0) {
+                           throw closedEx;
+                        }
+                        Thread.sleep(2000);
+                        retryCreate--;
+                     }
                   }
+                  session.start();
 
                   for (int i = 0; i < numMessages; i++) {
                      if (log.isDebugEnabled()) {
                         log.debug("Consumer receiving message " + i);
                      }
-                     ClientMessage message = consumer.receive(10000);
+                     ClientMessage message = consumer.receive(60000);
                      if (message == null) {
                         break;
                      }
@@ -429,13 +474,14 @@ public class AsynchronousFailoverTest extends 
FailoverTestBase {
                   try {
                      session.commit();
                   } catch (ActiveMQTransactionRolledBackException trbe) {
-                     //we know the tx has been rolled back so we just consume 
again
+                        //we know the tx has been rolled back so we just 
consume again
                      retry = true;
                      continue;
                   } catch (ActiveMQException e) {
                      // This could eventually happen
                      // We will get rid of this when we implement 2 phase 
commit on failover
-                     log.warn("exception during commit, it will be ignored for 
now" + e.getMessage(), e);
+                     log.warn("exception during commit, continue " + 
e.getMessage(), e);
+                     continue;
                   }
 
                   try {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
index 06305b4..56406c9 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/SameProcessActiveMQServer.java
@@ -83,7 +83,17 @@ public class SameProcessActiveMQServer implements 
TestableServer {
    public CountDownLatch crash(boolean failover, boolean waitFailure, 
ClientSession... sessions) throws Exception {
       CountDownLatch latch = new CountDownLatch(sessions.length);
       CountDownSessionFailureListener[] listeners = new 
CountDownSessionFailureListener[sessions.length];
+      long callTimeout = 0;
+      long failoverCallTimeout = 0;
       for (int i = 0; i < sessions.length; i++) {
+         long timeout = 
sessions[i].getSessionFactory().getServerLocator().getCallTimeout();
+         long failoverTimeout = 
sessions[i].getSessionFactory().getServerLocator().getCallFailoverTimeout();
+         if (callTimeout < timeout) {
+            callTimeout = timeout;
+         }
+         if (failoverCallTimeout < failoverTimeout) {
+            failoverCallTimeout = failoverTimeout;
+         }
          listeners[i] = new CountDownSessionFailureListener(latch, 
sessions[i]);
          sessions[i].addFailureListener(listeners[i]);
       }
@@ -96,7 +106,10 @@ public class SameProcessActiveMQServer implements 
TestableServer {
 
       if (waitFailure) {
          // Wait to be informed of failure
-         boolean ok = latch.await(10000, TimeUnit.MILLISECONDS);
+         // In case of a failover, remote call will block and it may also wait 
for failover
+         // so in order to give enough time we double the wait time.
+         boolean ok = latch.await(callTimeout * 2 + failoverCallTimeout * 2, 
TimeUnit.MILLISECONDS);
+
          Assert.assertTrue("Failed to stop the server! Latch count is " + 
latch.getCount() + " out of " +
                               sessions.length, ok);
       }

Reply via email to