This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d81c0f44eb3fcc9d6783598d60df293902351e26
Author: Clebert Suconic <[email protected]>
AuthorDate: Fri Apr 17 22:31:10 2020 -0400

    NO-JIRA Fixing intermittent failures
---
 .../core/postoffice/impl/DuplicateIDCacheImpl.java |  4 +-
 .../core/remoting/impl/netty/NettyAcceptor.java    |  8 ++-
 .../integration/client/ConsumerWindowSizeTest.java |  6 +-
 .../cluster/distribution/ClusterTestBase.java      | 74 ++++++++++++----------
 .../tests/integration/jms/RedeployTest.java        | 13 +---
 .../tests/integration/remoting/ReconnectTest.java  | 11 ++--
 .../resources/reload-queue-routingtype-updated.xml |  1 +
 .../test/resources/reload-queue-routingtype.xml    |  1 +
 8 files changed, 64 insertions(+), 54 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
index 4446178..7f16045 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java
@@ -304,7 +304,9 @@ public class DuplicateIDCacheImpl implements 
DuplicateIDCache {
          if (ids.size() > 0 && persist) {
             long tx = storageManager.generateID();
             for (Pair<ByteArrayHolder, Long> id : ids) {
-               storageManager.deleteDuplicateIDTransactional(tx, id.getB());
+               if (id != null) {
+                  storageManager.deleteDuplicateIDTransactional(tx, id.getB());
+               }
             }
             storageManager.commit(tx);
          }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
index 924f335..d207fdf 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java
@@ -506,8 +506,14 @@ public class NettyAcceptor extends AbstractAcceptor {
 
    @Override
    public void reload() {
-      serverChannelGroup.disconnect();
+      ChannelGroupFuture future = serverChannelGroup.disconnect();
+      try {
+         future.awaitUninterruptibly();
+      } catch (Exception ignored) {
+      }
+
       serverChannelGroup.clear();
+
       startServerChannels();
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
index 843aee1..9778ff1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java
@@ -332,15 +332,15 @@ public class ConsumerWindowSizeTest extends 
ActiveMQTestBase {
 
                   while (true) {
 
-                     ClientMessage msg = consumer.receiveImmediate();
-                     if (msg == null) {
+                     if (received.incrementAndGet() > NUMBER_OF_MESSAGES) {
+                        received.decrementAndGet();
                         break;
                      }
+                     ClientMessage msg = consumer.receive(1000);
                      msg.acknowledge();
 
                      session.commit();
 
-                     received.incrementAndGet();
 
                   }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
index 02adf2b..8205ee0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java
@@ -303,52 +303,56 @@ public abstract class ClusterTestBase extends 
ActiveMQTestBase {
    }
 
    private void logTopologyDiagram() {
-      StringBuffer topologyDiagram = new StringBuffer();
-      for (ActiveMQServer activeMQServer : servers) {
-         if (activeMQServer != null) {
-            
topologyDiagram.append("\n").append(activeMQServer.getIdentity()).append("\n");
-            if (activeMQServer.isStarted()) {
-               Set<ClusterConnection> ccs = 
activeMQServer.getClusterManager().getClusterConnections();
-
-               if (ccs.size() >= 1) {
-                  ClusterConnectionImpl clusterConnection = 
(ClusterConnectionImpl) ccs.iterator().next();
-                  Collection<TopologyMemberImpl> members = 
clusterConnection.getTopology().getMembers();
-                  for (TopologyMemberImpl member : members) {
-                     String nodeId = member.getNodeId();
-                     String liveServer = null;
-                     String backupServer = null;
-                     for (ActiveMQServer server : servers) {
-                        if (server != null && server.getNodeID() != null && 
server.isActive() && server.getNodeID().toString().equals(nodeId)) {
-                           if (server.isActive()) {
-                              liveServer = server.getIdentity();
-                              if (member.getLive() != null) {
-                                 liveServer += "(notified)";
-                              } else {
-                                 liveServer += "(not notified)";
-                              }
-                           } else {
-                              backupServer = server.getIdentity();
-                              if (member.getBackup() != null) {
-                                 liveServer += "(notified)";
+      try {
+         StringBuffer topologyDiagram = new StringBuffer();
+         for (ActiveMQServer activeMQServer : servers) {
+            if (activeMQServer != null) {
+               
topologyDiagram.append("\n").append(activeMQServer.getIdentity()).append("\n");
+               if (activeMQServer.isStarted()) {
+                  Set<ClusterConnection> ccs = 
activeMQServer.getClusterManager().getClusterConnections();
+
+                  if (ccs.size() >= 1) {
+                     ClusterConnectionImpl clusterConnection = 
(ClusterConnectionImpl) ccs.iterator().next();
+                     Collection<TopologyMemberImpl> members = 
clusterConnection.getTopology().getMembers();
+                     for (TopologyMemberImpl member : members) {
+                        String nodeId = member.getNodeId();
+                        String liveServer = null;
+                        String backupServer = null;
+                        for (ActiveMQServer server : servers) {
+                           if (server != null && server.getNodeID() != null && 
server.isActive() && server.getNodeID().toString().equals(nodeId)) {
+                              if (server.isActive()) {
+                                 liveServer = server.getIdentity();
+                                 if (member.getLive() != null) {
+                                    liveServer += "(notified)";
+                                 } else {
+                                    liveServer += "(not notified)";
+                                 }
                               } else {
-                                 liveServer += "(not notified)";
+                                 backupServer = server.getIdentity();
+                                 if (member.getBackup() != null) {
+                                    liveServer += "(notified)";
+                                 } else {
+                                    liveServer += "(not notified)";
+                                 }
                               }
                            }
                         }
-                     }
 
-                     
topologyDiagram.append("\t").append("|\n").append("\t->").append(liveServer).append("/").append(backupServer).append("\n");
+                        
topologyDiagram.append("\t").append("|\n").append("\t->").append(liveServer).append("/").append(backupServer).append("\n");
+                     }
+                  } else {
+                     topologyDiagram.append("-> no cluster connections\n");
                   }
                } else {
-                  topologyDiagram.append("-> no cluster connections\n");
+                  topologyDiagram.append("-> stopped\n");
                }
-            } else {
-               topologyDiagram.append("-> stopped\n");
             }
          }
+         topologyDiagram.append("\n");
+         log.info(topologyDiagram.toString());
+      } catch (Throwable e) {
+         log.warn("error printing the topology::" + e.getMessage(), e);
       }
-      topologyDiagram.append("\n");
-      log.info(topologyDiagram.toString());
    }
 
    protected void waitForMessages(final int node, final String address, final 
int count) throws Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index cb9dca4..f0f430c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -492,14 +492,7 @@ public class RedeployTest extends ActiveMQTestBase {
 
       final ReusableLatch latch = new ReusableLatch(1);
 
-      Runnable tick = new Runnable() {
-         @Override
-         public void run() {
-            latch.countDown();
-         }
-      };
-
-      embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
+      
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
 
       try {
          ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
@@ -514,8 +507,8 @@ public class RedeployTest extends ActiveMQTestBase {
          Files.copy(url2.openStream(), brokerXML, 
StandardCopyOption.REPLACE_EXISTING);
          brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
          latch.setCount(1);
-         embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
-         latch.await(10, TimeUnit.SECONDS);
+         
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
+         Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
 
          Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
          Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, 
"myQueue").getRoutingType());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
index 13ddd11..a3a3cbe 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -383,14 +384,14 @@ public class ReconnectTest extends ActiveMQTestBase {
       ActiveMQServer server = createServer(true, true);
       server.start();
 
+      final AtomicBoolean consumerClosed = new AtomicBoolean(false);
       // imitate consumer close timeout
       Interceptor reattachInterceptor = new Interceptor() {
-         boolean consumerClosed;
 
          @Override
          public boolean intercept(Packet packet, RemotingConnection 
connection) throws ActiveMQException {
-            if (!consumerClosed && packet.getType() == 
PacketImpl.SESS_CONSUMER_CLOSE) {
-               consumerClosed = true;
+            if (!consumerClosed.get() && packet.getType() == 
PacketImpl.SESS_CONSUMER_CLOSE) {
+               consumerClosed.set(true);
                return false;
             } else {
                return true;
@@ -403,7 +404,7 @@ public class ReconnectTest extends ActiveMQTestBase {
       final long retryInterval = 500;
       final double retryMultiplier = 1d;
       final int reconnectAttempts = 10;
-      ServerLocator locator = 
createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
+      ServerLocator locator = 
createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1);
       ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) 
createSessionFactory(locator);
 
       ClientSessionInternal session = 
(ClientSessionInternal)sf.createSession(false, true, true);
@@ -416,6 +417,8 @@ public class ReconnectTest extends ActiveMQTestBase {
       ClientConsumer clientConsumer2 = session.createConsumer(queueName1);
       clientConsumer1.close();
 
+      Wait.assertTrue(consumerClosed::get);
+
       Wait.assertEquals(1, () -> getConsumerCount(server, session));
 
       Set<ServerConsumer> serverConsumers = 
server.getSessionByID(session.getName()).getServerConsumers();
diff --git 
a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
 
b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
index be72361..4239b6e 100644
--- 
a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
+++ 
b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml
@@ -25,6 +25,7 @@ under the License.
    <core xmlns="urn:activemq:core">
       <security-enabled>false</security-enabled>
       <persistence-enabled>false</persistence-enabled>
+      
<configuration-file-refresh-period>100</configuration-file-refresh-period>
 
       <acceptors>
          <!-- Default ActiveMQ Artemis Acceptor.  Multi-protocol adapter.  
Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and 
HornetQ Core. -->
diff --git 
a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml 
b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
index 78338ce..047b78e 100644
--- a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
+++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml
@@ -25,6 +25,7 @@ under the License.
    <core xmlns="urn:activemq:core">
       <security-enabled>false</security-enabled>
       <persistence-enabled>false</persistence-enabled>
+      
<configuration-file-refresh-period>100</configuration-file-refresh-period>
 
       <acceptors>
          <!-- Default ActiveMQ Artemis Acceptor.  Multi-protocol adapter.  
Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and 
HornetQ Core. -->

Reply via email to