This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit d81c0f44eb3fcc9d6783598d60df293902351e26 Author: Clebert Suconic <[email protected]> AuthorDate: Fri Apr 17 22:31:10 2020 -0400 NO-JIRA Fixing intermittent failures --- .../core/postoffice/impl/DuplicateIDCacheImpl.java | 4 +- .../core/remoting/impl/netty/NettyAcceptor.java | 8 ++- .../integration/client/ConsumerWindowSizeTest.java | 6 +- .../cluster/distribution/ClusterTestBase.java | 74 ++++++++++++---------- .../tests/integration/jms/RedeployTest.java | 13 +--- .../tests/integration/remoting/ReconnectTest.java | 11 ++-- .../resources/reload-queue-routingtype-updated.xml | 1 + .../test/resources/reload-queue-routingtype.xml | 1 + 8 files changed, 64 insertions(+), 54 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java index 4446178..7f16045 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/DuplicateIDCacheImpl.java @@ -304,7 +304,9 @@ public class DuplicateIDCacheImpl implements DuplicateIDCache { if (ids.size() > 0 && persist) { long tx = storageManager.generateID(); for (Pair<ByteArrayHolder, Long> id : ids) { - storageManager.deleteDuplicateIDTransactional(tx, id.getB()); + if (id != null) { + storageManager.deleteDuplicateIDTransactional(tx, id.getB()); + } } storageManager.commit(tx); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 924f335..d207fdf 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -506,8 +506,14 @@ public class NettyAcceptor extends AbstractAcceptor { @Override public void reload() { - serverChannelGroup.disconnect(); + ChannelGroupFuture future = serverChannelGroup.disconnect(); + try { + future.awaitUninterruptibly(); + } catch (Exception ignored) { + } + serverChannelGroup.clear(); + startServerChannels(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java index 843aee1..9778ff1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerWindowSizeTest.java @@ -332,15 +332,15 @@ public class ConsumerWindowSizeTest extends ActiveMQTestBase { while (true) { - ClientMessage msg = consumer.receiveImmediate(); - if (msg == null) { + if (received.incrementAndGet() > NUMBER_OF_MESSAGES) { + received.decrementAndGet(); break; } + ClientMessage msg = consumer.receive(1000); msg.acknowledge(); session.commit(); - received.incrementAndGet(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 02adf2b..8205ee0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -303,52 +303,56 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { } private void logTopologyDiagram() { - StringBuffer topologyDiagram = new StringBuffer(); - for (ActiveMQServer activeMQServer : servers) { - if (activeMQServer != null) { - topologyDiagram.append("\n").append(activeMQServer.getIdentity()).append("\n"); - if (activeMQServer.isStarted()) { - Set<ClusterConnection> ccs = activeMQServer.getClusterManager().getClusterConnections(); - - if (ccs.size() >= 1) { - ClusterConnectionImpl clusterConnection = (ClusterConnectionImpl) ccs.iterator().next(); - Collection<TopologyMemberImpl> members = clusterConnection.getTopology().getMembers(); - for (TopologyMemberImpl member : members) { - String nodeId = member.getNodeId(); - String liveServer = null; - String backupServer = null; - for (ActiveMQServer server : servers) { - if (server != null && server.getNodeID() != null && server.isActive() && server.getNodeID().toString().equals(nodeId)) { - if (server.isActive()) { - liveServer = server.getIdentity(); - if (member.getLive() != null) { - liveServer += "(notified)"; - } else { - liveServer += "(not notified)"; - } - } else { - backupServer = server.getIdentity(); - if (member.getBackup() != null) { - liveServer += "(notified)"; + try { + StringBuffer topologyDiagram = new StringBuffer(); + for (ActiveMQServer activeMQServer : servers) { + if (activeMQServer != null) { + topologyDiagram.append("\n").append(activeMQServer.getIdentity()).append("\n"); + if (activeMQServer.isStarted()) { + Set<ClusterConnection> ccs = activeMQServer.getClusterManager().getClusterConnections(); + + if (ccs.size() >= 1) { + ClusterConnectionImpl clusterConnection = (ClusterConnectionImpl) ccs.iterator().next(); + Collection<TopologyMemberImpl> members = clusterConnection.getTopology().getMembers(); + for (TopologyMemberImpl member : members) { + String nodeId = member.getNodeId(); + String liveServer = null; + String backupServer = null; + for (ActiveMQServer server : servers) { + if (server != null && server.getNodeID() != null && server.isActive() && server.getNodeID().toString().equals(nodeId)) { + if (server.isActive()) { + liveServer = server.getIdentity(); + if (member.getLive() != null) { + liveServer += "(notified)"; + } else { + liveServer += "(not notified)"; + } } else { - liveServer += "(not notified)"; + backupServer = server.getIdentity(); + if (member.getBackup() != null) { + liveServer += "(notified)"; + } else { + liveServer += "(not notified)"; + } } } } - } - topologyDiagram.append("\t").append("|\n").append("\t->").append(liveServer).append("/").append(backupServer).append("\n"); + topologyDiagram.append("\t").append("|\n").append("\t->").append(liveServer).append("/").append(backupServer).append("\n"); + } + } else { + topologyDiagram.append("-> no cluster connections\n"); } } else { - topologyDiagram.append("-> no cluster connections\n"); + topologyDiagram.append("-> stopped\n"); } - } else { - topologyDiagram.append("-> stopped\n"); } } + topologyDiagram.append("\n"); + log.info(topologyDiagram.toString()); + } catch (Throwable e) { + log.warn("error printing the topology::" + e.getMessage(), e); } - topologyDiagram.append("\n"); - log.info(topologyDiagram.toString()); } protected void waitForMessages(final int node, final String address, final int count) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index cb9dca4..f0f430c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -492,14 +492,7 @@ public class RedeployTest extends ActiveMQTestBase { final ReusableLatch latch = new ReusableLatch(1); - Runnable tick = new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }; - - embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown); try { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616"); @@ -514,8 +507,8 @@ public class RedeployTest extends ActiveMQTestBase { Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING); brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000); latch.setCount(1); - embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick); - latch.await(10, TimeUnit.SECONDS); + embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown); + Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress")); Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java index 13ddd11..a3a3cbe 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/remoting/ReconnectTest.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -383,14 +384,14 @@ public class ReconnectTest extends ActiveMQTestBase { ActiveMQServer server = createServer(true, true); server.start(); + final AtomicBoolean consumerClosed = new AtomicBoolean(false); // imitate consumer close timeout Interceptor reattachInterceptor = new Interceptor() { - boolean consumerClosed; @Override public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { - if (!consumerClosed && packet.getType() == PacketImpl.SESS_CONSUMER_CLOSE) { - consumerClosed = true; + if (!consumerClosed.get() && packet.getType() == PacketImpl.SESS_CONSUMER_CLOSE) { + consumerClosed.set(true); return false; } else { return true; @@ -403,7 +404,7 @@ public class ReconnectTest extends ActiveMQTestBase { final long retryInterval = 500; final double retryMultiplier = 1d; final int reconnectAttempts = 10; - ServerLocator locator = createFactory(true).setCallTimeout(2000).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); + ServerLocator locator = createFactory(true).setCallTimeout(200).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryMultiplier).setReconnectAttempts(reconnectAttempts).setConfirmationWindowSize(-1); ClientSessionFactoryInternal sf = (ClientSessionFactoryInternal) createSessionFactory(locator); ClientSessionInternal session = (ClientSessionInternal)sf.createSession(false, true, true); @@ -416,6 +417,8 @@ public class ReconnectTest extends ActiveMQTestBase { ClientConsumer clientConsumer2 = session.createConsumer(queueName1); clientConsumer1.close(); + Wait.assertTrue(consumerClosed::get); + Wait.assertEquals(1, () -> getConsumerCount(server, session)); Set<ServerConsumer> serverConsumers = server.getSessionByID(session.getName()).getServerConsumers(); diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml index be72361..4239b6e 100644 --- a/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml +++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype-updated.xml @@ -25,6 +25,7 @@ under the License. <core xmlns="urn:activemq:core"> <security-enabled>false</security-enabled> <persistence-enabled>false</persistence-enabled> + <configuration-file-refresh-period>100</configuration-file-refresh-period> <acceptors> <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. --> diff --git a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml index 78338ce..047b78e 100644 --- a/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml +++ b/tests/integration-tests/src/test/resources/reload-queue-routingtype.xml @@ -25,6 +25,7 @@ under the License. <core xmlns="urn:activemq:core"> <security-enabled>false</security-enabled> <persistence-enabled>false</persistence-enabled> + <configuration-file-refresh-period>100</configuration-file-refresh-period> <acceptors> <!-- Default ActiveMQ Artemis Acceptor. Multi-protocol adapter. Currently supports ActiveMQ Artemis Core, OpenWire, STOMP, AMQP, MQTT, and HornetQ Core. -->
