This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit e4c5ea719fb531d23a8eccbf8aa01f55975e69c0 Author: Francesco Nigro <[email protected]> AuthorDate: Wed Apr 8 16:56:14 2020 +0200 ARTEMIS-2713 Added ReplicatedMultipleFailbackTest --- .../org/apache/activemq/artemis/utils/Wait.java | 3 + .../cluster/qourum/SharedNothingBackupQuorum.java | 148 +++++----- .../server/impl/SharedNothingBackupActivation.java | 2 +- tests/smoke-tests/pom.xml | 82 ++++++ .../servers/replicated-failback-master1/broker.xml | 137 ++++++++++ .../replicated-failback-master1/management.xml | 20 ++ .../servers/replicated-failback-master2/broker.xml | 138 ++++++++++ .../replicated-failback-master2/management.xml | 20 ++ .../servers/replicated-failback-master3/broker.xml | 138 ++++++++++ .../replicated-failback-master3/management.xml | 20 ++ .../servers/replicated-failback-slave1/broker.xml | 137 ++++++++++ .../replicated-failback-slave1/management.xml | 20 ++ .../artemis/tests/smoke/common/SmokeTestBase.java | 19 +- .../ReplicatedMultipleFailbackTest.java | 300 +++++++++++++++++++++ 14 files changed, 1115 insertions(+), 69 deletions(-) diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java index 6cb011b..ab5826b 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java @@ -101,6 +101,9 @@ public class Wait { assertTrue(DEFAULT_FAILURE_MESSAGE, () -> !condition.isSatisfied(), duration, sleep); } + public static void assertTrue(Condition condition, final long duration) { + assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, SLEEP_MILLIS); + } public static void assertTrue(String failureMessage, Condition condition) { assertTrue(failureMessage, condition, MAX_WAIT_MILLIS); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java index 5e498af..9da89d0 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/qourum/SharedNothingBackupQuorum.java @@ -18,23 +18,23 @@ package org.apache.activemq.artemis.core.server.cluster.qourum; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.api.core.ActiveMQException; -import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.Topology; -import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.NodeManager; +import org.jboss.logging.Logger; public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener { - private TransportConfiguration liveTransportConfiguration; + private static final Logger LOGGER = Logger.getLogger(SharedNothingBackupQuorum.class); public enum BACKUP_ACTIVATION { FAIL_OVER, FAILURE_REPLICATING, ALREADY_REPLICATING, STOP; @@ -46,7 +46,6 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener private final NodeManager nodeManager; - private final StorageManager storageManager; private final ScheduledExecutorService scheduledPool; private final int quorumSize; @@ -56,8 +55,6 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener private final Object voteGuard = new Object(); - private CountDownLatch latch; - private ClientSessionFactoryInternal sessionFactory; private CoreRemotingConnection connection; @@ -67,6 +64,14 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener private volatile boolean stopped = false; private final int quorumVoteWait; + + private volatile BACKUP_ACTIVATION signal; + + private ScheduledFuture<?> decisionGuard; + + private CountDownLatch latch; + + private final Object onConnectionFailureGuard = new Object(); /** * This is a safety net in case the live sends the first {@link ReplicationLiveIsStoppingMessage} * with code {@link org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage.LiveStopping#STOP_CALLED} and crashes before sending the second with @@ -76,15 +81,13 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener */ public static final int WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG = 60; - public SharedNothingBackupQuorum(StorageManager storageManager, - NodeManager nodeManager, + public SharedNothingBackupQuorum(NodeManager nodeManager, ScheduledExecutorService scheduledPool, NetworkHealthCheck networkHealthCheck, int quorumSize, int voteRetries, long voteRetryWait, int quorumVoteWait) { - this.storageManager = storageManager; this.scheduledPool = scheduledPool; this.quorumSize = quorumSize; this.latch = new CountDownLatch(1); @@ -95,29 +98,24 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener this.quorumVoteWait = quorumVoteWait; } - private volatile BACKUP_ACTIVATION signal; - - /** - * safety parameter to make _sure_ we get out of await() - */ - private static final int LATCH_TIMEOUT = 30; - - private final Object decisionGuard = new Object(); - @Override public String getName() { return "SharedNothingBackupQuorum"; } - public void decideOnAction(Topology topology) { - //we may get called via multiple paths so need to guard - synchronized (decisionGuard) { + private void onConnectionFailure() { + //we may get called as sessionFactory or connection listener + synchronized (onConnectionFailureGuard) { if (signal == BACKUP_ACTIVATION.FAIL_OVER) { + LOGGER.debug("Replication connection failure with signal == FAIL_OVER: no need to take any action"); if (networkHealthCheck != null && !networkHealthCheck.check()) { signal = BACKUP_ACTIVATION.FAILURE_REPLICATING; } return; } + //given that we're going to latch.countDown(), there is no need to await any + //scheduled task to complete + stopForcedFailoverAfterDelay(); if (!isLiveDown()) { //lost connection but don't know if live is down so restart as backup as we can't replicate any more ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults(); @@ -140,16 +138,13 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener signal = BACKUP_ACTIVATION.FAILURE_REPLICATING; } } + latch.countDown(); } - latch.countDown(); } public void liveIDSet(String liveID) { targetServerID = liveID; nodeManager.setNodeID(liveID); - liveTransportConfiguration = quorumManager.getLiveTransportConfiguration(targetServerID); - //now we are replicating we can start waiting for disconnect notifications so we can fail over - // sessionFactory.addFailureListener(this); } @Override @@ -166,14 +161,12 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener */ @Override public void nodeDown(Topology topology, long eventUID, String nodeID) { - if (targetServerID.equals(nodeID)) { - decideOnAction(topology); - } + //noop: we are NOT interested on topology info coming from connections != this.connection } @Override public void nodeUp(Topology topology) { - //noop + //noop: we are NOT interested on topology info coming from connections != this.connection } /** @@ -181,7 +174,7 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener */ @Override public void connectionFailed(ActiveMQException exception, boolean failedOver) { - decideOnAction(sessionFactory.getServerLocator().getTopology()); + onConnectionFailure(); } @Override @@ -204,10 +197,10 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener */ public void setSessionFactory(final ClientSessionFactoryInternal sessionFactory) { this.sessionFactory = sessionFactory; - this.connection = (CoreRemotingConnection) sessionFactory.getConnection(); - connection.addFailureListener(this); //belts and braces, there are circumstances where the connection listener doesn't get called but the session does. - sessionFactory.addFailureListener(this); + this.sessionFactory.addFailureListener(this); + connection = (CoreRemotingConnection) sessionFactory.getConnection(); + connection.addFailureListener(this); } /** @@ -217,20 +210,19 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener * backup that it should fail-over. */ public synchronized void failOver(ReplicationLiveIsStoppingMessage.LiveStopping finalMessage) { - removeListener(); + removeListeners(); signal = BACKUP_ACTIVATION.FAIL_OVER; - if (finalMessage == ReplicationLiveIsStoppingMessage.LiveStopping.FAIL_OVER) { - latch.countDown(); - } - if (finalMessage == ReplicationLiveIsStoppingMessage.LiveStopping.STOP_CALLED) { - final CountDownLatch localLatch = latch; - scheduledPool.schedule(new Runnable() { - @Override - public void run() { - localLatch.countDown(); - } - - }, WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS); + switch (finalMessage) { + + case STOP_CALLED: + scheduleForcedFailoverAfterDelay(latch); + break; + case FAIL_OVER: + stopForcedFailoverAfterDelay(); + latch.countDown(); + break; + default: + LOGGER.errorf("unsupported LiveStopping type: %s", finalMessage); } } @@ -244,10 +236,13 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener latch.countDown(); } - private void removeListener() { + private void removeListeners() { if (connection != null) { connection.removeFailureListener(this); } + if (sessionFactory != null) { + sessionFactory.removeFailureListener(this); + } } /** @@ -271,13 +266,38 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener * @param explicitSignal the state we want to set the quorum manager to return */ public synchronized void causeExit(BACKUP_ACTIVATION explicitSignal) { + stopForcedFailoverAfterDelay(); stopped = true; - removeListener(); + removeListeners(); this.signal = explicitSignal; latch.countDown(); } + private synchronized void scheduleForcedFailoverAfterDelay(CountDownLatch signalChanged) { + if (decisionGuard != null) { + if (decisionGuard.isDone()) { + LOGGER.warn("A completed force failover task wasn't cleaned-up: a new one will be scheduled"); + } else if (!decisionGuard.cancel(false)) { + LOGGER.warn("Failed to cancel an existing uncompleted force failover task: a new one will be scheduled anyway"); + } else { + LOGGER.warn("Cancelled an existing uncompleted force failover task: a new one will be scheduled in its place"); + } + } + decisionGuard = scheduledPool.schedule(signalChanged::countDown, + WAIT_TIME_AFTER_FIRST_LIVE_STOPPING_MSG, TimeUnit.SECONDS); + } + + private synchronized boolean stopForcedFailoverAfterDelay() { + if (decisionGuard == null) { + return false; + } + final boolean stopped = decisionGuard.cancel(false); + decisionGuard = null; + return stopped; + } + public synchronized void reset() { + stopForcedFailoverAfterDelay(); latch = new CountDownLatch(1); } @@ -287,13 +307,21 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener * @return the voting decision */ private boolean isLiveDown() { - //lets assume live is not down - Boolean decision = false; - int voteAttempts = 0; - int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; + // lets assume live is not down + if (stopped) { + return false; + } + final int size = quorumSize == -1 ? quorumManager.getMaxClusterSize() : quorumSize; synchronized (voteGuard) { - while (!stopped && voteAttempts++ < voteRetries) { + for (int voteAttempts = 0; voteAttempts < voteRetries && !stopped; voteAttempts++) { + if (voteAttempts > 0) { + try { + voteGuard.wait(voteRetryWait); + } catch (InterruptedException e) { + //nothing to do here + } + } //the live is dead so lets vote for quorum QuorumVoteServerConnect quorumVote = new QuorumVoteServerConnect(size, targetServerID); @@ -308,19 +336,11 @@ public class SharedNothingBackupQuorum implements Quorum, SessionFailureListener quorumManager.voteComplete(quorumVote); - decision = quorumVote.getDecision(); - - if (decision) { - return decision; - } - try { - voteGuard.wait(voteRetryWait); - } catch (InterruptedException e) { - //nothing to do here + if (quorumVote.getDecision()) { + return true; } } } - - return decision; + return false; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 587b8f0..bbfc2b3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -131,7 +131,7 @@ public final class SharedNothingBackupActivation extends Activation { logger.trace("Entered a synchronized"); if (closed) return; - backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getStorageManager(), activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait()); + backupQuorum = new SharedNothingBackupQuorum(activeMQServer.getNodeManager(), activeMQServer.getScheduledPool(), networkHealthCheck, replicaPolicy.getQuorumSize(), replicaPolicy.getVoteRetries(), replicaPolicy.getVoteRetryWait(), replicaPolicy.getQuorumVoteWait()); activeMQServer.getClusterManager().getQuorumManager().registerQuorum(backupQuorum); activeMQServer.getClusterManager().getQuorumManager().registerQuorumHandler(new ServerConnectVoteHandler(activeMQServer)); } diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml index 2482a8d..1eb888c 100644 --- a/tests/smoke-tests/pom.xml +++ b/tests/smoke-tests/pom.xml @@ -307,6 +307,88 @@ </args> </configuration> </execution> + <!-- START JmxReplicatedMultipleFailbackTest --> + <execution> + <phase>test-compile</phase> + <id>create-replicated-failback-master1</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <!-- this makes it easier in certain envs --> + <configuration>${basedir}/target/classes/servers/replicated-failback-master1</configuration> + <allowAnonymous>true</allowAnonymous> + <user>admin</user> + <password>admin</password> + <instance>${basedir}/target/replicated-failback-master1</instance> + <args> + <!-- this is needed to run the server remotely --> + <arg>--java-options</arg> + <arg>-Djava.rmi.server.hostname=localhost</arg> + </args> + </configuration> + </execution> + <execution> + <phase>test-compile</phase> + <id>create-replicated-failback-master2</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <!-- this makes it easier in certain envs --> + <configuration>${basedir}/target/classes/servers/replicated-failback-master2</configuration> + <allowAnonymous>true</allowAnonymous> + <user>admin</user> + <password>admin</password> + <instance>${basedir}/target/replicated-failback-master2</instance> + <args> + <!-- this is needed to run the server remotely --> + <arg>--java-options</arg> + <arg>-Djava.rmi.server.hostname=localhost</arg> + </args> + </configuration> + </execution> + <execution> + <phase>test-compile</phase> + <id>create-replicated-failback-master3</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <!-- this makes it easier in certain envs --> + <configuration>${basedir}/target/classes/servers/replicated-failback-master3</configuration> + <allowAnonymous>true</allowAnonymous> + <user>admin</user> + <password>admin</password> + <instance>${basedir}/target/replicated-failback-master3</instance> + <args> + <!-- this is needed to run the server remotely --> + <arg>--java-options</arg> + <arg>-Djava.rmi.server.hostname=localhost</arg> + </args> + </configuration> + </execution> + <execution> + <phase>test-compile</phase> + <id>create-replicated-failback-slave1</id> + <goals> + <goal>create</goal> + </goals> + <configuration> + <!-- this makes it easier in certain envs --> + <configuration>${basedir}/target/classes/servers/replicated-failback-slave1</configuration> + <allowAnonymous>true</allowAnonymous> + <user>admin</user> + <password>admin</password> + <instance>${basedir}/target/replicated-failback-slave1</instance> + <args> + <!-- this is needed to run the server remotely --> + <arg>--java-options</arg> + <arg>-Djava.rmi.server.hostname=localhost</arg> + </args> + </configuration> + </execution> + <!-- END JmxReplicatedMultipleFailbackTest --> <execution> <phase>test-compile</phase> <id>create-paging</id> diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/broker.xml new file mode 100644 index 0000000..656eac8 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/broker.xml @@ -0,0 +1,137 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>master1</name> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/largemessages</large-messages-directory> + + <paging-directory>./data/paging</paging-directory> + + <ha-policy> + <replication> + <master> + <group-name>a</group-name> + <check-for-live-server>true</check-for-live-server> + <vote-on-replication-failure>true</vote-on-replication-failure> + </master> + </replication> + </ha-policy> + + <connectors> + <!-- Connector used to be announced through cluster connections and notifications --> + <connector name="artemis">tcp://localhost:61616</connector> + <connector name="master2">tcp://localhost:61716</connector> + <connector name="master3">tcp://localhost:61816</connector> + <connector name="slave1">tcp://localhost:61916</connector> + </connectors> + + <!-- Acceptors --> + <acceptors> + <acceptor name="artemis">tcp://localhost:61616</acceptor> + </acceptors> + + <cluster-user>admin</cluster-user> + + <cluster-password>password</cluster-password> + + <cluster-connections> + <cluster-connection name="my-cluster"> + <connector-ref>artemis</connector-ref> + <message-load-balancing>OFF</message-load-balancing> + <max-hops>1</max-hops> + <static-connectors> + <connector-ref>slave1</connector-ref> + <connector-ref>master2</connector-ref> + <connector-ref>master3</connector-ref> + </static-connectors> + </cluster-connection> + </cluster-connections> + + <!-- Other config --> + + <security-settings> + <!--security for example queue--> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="amq, guest"/> + <permission type="deleteNonDurableQueue" roles="amq, guest"/> + <permission type="createDurableQueue" roles="amq, guest"/> + <permission type="deleteDurableQueue" roles="amq, guest"/> + <permission type="createAddress" roles="amq, guest"/> + <permission type="deleteAddress" roles="amq, guest"/> + <permission type="consume" roles="amq, guest"/> + <permission type="browse" roles="amq, guest"/> + <permission type="send" roles="amq, guest"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="amq"/> + </security-setting> + </security-settings> + + <address-settings> + <!-- if you define auto-create on certain queues, management has to be auto-create --> + <address-setting match="activemq.management#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + <!--default for catch all--> + <address-setting match="#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>10MB</max-size-bytes> + <page-size-bytes>1MB</page-size-bytes> + + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + </address-settings> + + <addresses> + <address name="exampleTopic"> + <multicast> + </multicast> + </address> + <address name="exampleQueue"> + <anycast> + <queue name="exampleQueue"/> + </anycast> + </address> + </addresses> + </core> +</configuration> diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/management.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/management.xml new file mode 100644 index 0000000..576f1e5 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master1/management.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<management-context xmlns="http://activemq.org/schema"> + <connector connector-port="10099" connector-host="localhost"/> +</management-context> \ No newline at end of file diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/broker.xml new file mode 100644 index 0000000..4a692ae --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/broker.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>master2</name> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/largemessages</large-messages-directory> + + <paging-directory>./data/paging</paging-directory> + + <ha-policy> + <replication> + <master> + <group-name>b</group-name> + <check-for-live-server>true</check-for-live-server> + <vote-on-replication-failure>true</vote-on-replication-failure> + </master> + </replication> + </ha-policy> + + <connectors> + <!-- Connector used to be announced through cluster connections and notifications --> + <connector name="artemis">tcp://localhost:61716</connector> + <connector name="master1">tcp://localhost:61616</connector> + <connector name="master3">tcp://localhost:61816</connector> + <connector name="slave1">tcp://localhost:61816</connector> + </connectors> + + + <!-- Acceptors --> + <acceptors> + <acceptor name="artemis">tcp://localhost:61716</acceptor> + </acceptors> + + <cluster-user>admin</cluster-user> + + <cluster-password>password</cluster-password> + + <cluster-connections> + <cluster-connection name="my-cluster"> + <connector-ref>artemis</connector-ref> + <message-load-balancing>OFF</message-load-balancing> + <max-hops>1</max-hops> + <static-connectors> + <connector-ref>master3</connector-ref> + <connector-ref>master1</connector-ref> + <connector-ref>slave1</connector-ref> + </static-connectors> + </cluster-connection> + </cluster-connections> + + <!-- Other config --> + + <security-settings> + <!--security for example queue--> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="amq, guest"/> + <permission type="deleteNonDurableQueue" roles="amq, guest"/> + <permission type="createDurableQueue" roles="amq, guest"/> + <permission type="deleteDurableQueue" roles="amq, guest"/> + <permission type="createAddress" roles="amq, guest"/> + <permission type="deleteAddress" roles="amq, guest"/> + <permission type="consume" roles="amq, guest"/> + <permission type="browse" roles="amq, guest"/> + <permission type="send" roles="amq, guest"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="amq"/> + </security-setting> + </security-settings> + + <address-settings> + <!-- if you define auto-create on certain queues, management has to be auto-create --> + <address-setting match="activemq.management#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + <!--default for catch all--> + <address-setting match="#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>10MB</max-size-bytes> + <page-size-bytes>1MB</page-size-bytes> + + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + </address-settings> + + <addresses> + <address name="exampleTopic"> + <multicast> + </multicast> + </address> + <address name="exampleQueue"> + <anycast> + <queue name="exampleQueue"/> + </anycast> + </address> + </addresses> + </core> +</configuration> diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/management.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/management.xml new file mode 100644 index 0000000..14bbaf2 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master2/management.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<management-context xmlns="http://activemq.org/schema"> + <connector connector-port="10199" connector-host="localhost"/> +</management-context> \ No newline at end of file diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/broker.xml new file mode 100644 index 0000000..c9852ab --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/broker.xml @@ -0,0 +1,138 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>master3</name> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/largemessages</large-messages-directory> + + <paging-directory>./data/paging</paging-directory> + + <ha-policy> + <replication> + <master> + <group-name>c</group-name> + <check-for-live-server>true</check-for-live-server> + <vote-on-replication-failure>true</vote-on-replication-failure> + </master> + </replication> + </ha-policy> + + <connectors> + <!-- Connector used to be announced through cluster connections and notifications --> + <connector name="artemis">tcp://localhost:61816</connector> + <connector name="master1">tcp://localhost:61616</connector> + <connector name="master2">tcp://localhost:61716</connector> + <connector name="slave1">tcp://localhost:61916</connector> + </connectors> + + + <!-- Acceptors --> + <acceptors> + <acceptor name="artemis">tcp://localhost:61816</acceptor> + </acceptors> + + <cluster-user>admin</cluster-user> + + <cluster-password>password</cluster-password> + + <cluster-connections> + <cluster-connection name="my-cluster"> + <connector-ref>artemis</connector-ref> + <message-load-balancing>OFF</message-load-balancing> + <max-hops>1</max-hops> + <static-connectors> + <connector-ref>master2</connector-ref> + <connector-ref>master1</connector-ref> + <connector-ref>slave1</connector-ref> + </static-connectors> + </cluster-connection> + </cluster-connections> + + <!-- Other config --> + + <security-settings> + <!--security for example queue--> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="amq, guest"/> + <permission type="deleteNonDurableQueue" roles="amq, guest"/> + <permission type="createDurableQueue" roles="amq, guest"/> + <permission type="deleteDurableQueue" roles="amq, guest"/> + <permission type="createAddress" roles="amq, guest"/> + <permission type="deleteAddress" roles="amq, guest"/> + <permission type="consume" roles="amq, guest"/> + <permission type="browse" roles="amq, guest"/> + <permission type="send" roles="amq, guest"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="amq"/> + </security-setting> + </security-settings> + + <address-settings> + <!-- if you define auto-create on certain queues, management has to be auto-create --> + <address-setting match="activemq.management#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + <!--default for catch all--> + <address-setting match="#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>10MB</max-size-bytes> + <page-size-bytes>1MB</page-size-bytes> + + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + </address-settings> + + <addresses> + <address name="exampleTopic"> + <multicast> + </multicast> + </address> + <address name="exampleQueue"> + <anycast> + <queue name="exampleQueue"/> + </anycast> + </address> + </addresses> + </core> +</configuration> diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/management.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/management.xml new file mode 100644 index 0000000..9de3c59 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-master3/management.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<management-context xmlns="http://activemq.org/schema"> + <connector connector-port="10299" connector-host="localhost"/> +</management-context> \ No newline at end of file diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/broker.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/broker.xml new file mode 100644 index 0000000..601f3fd --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/broker.xml @@ -0,0 +1,137 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--><configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-server.xsd"> + + <core xmlns="urn:activemq:core"> + + <name>slave1</name> + + <bindings-directory>./data/bindings</bindings-directory> + + <journal-directory>./data/journal</journal-directory> + + <large-messages-directory>./data/largemessages</large-messages-directory> + + <paging-directory>./data/paging</paging-directory> + + <ha-policy> + <replication> + <slave> + <group-name>a</group-name> + <allow-failback>true</allow-failback> + </slave> + </replication> + </ha-policy> + + <connectors> + <!-- Connector used to be announced through cluster connections and notifications --> + <connector name="artemis">tcp://localhost:61916</connector> + <connector name="master1">tcp://localhost:61616</connector> + <connector name="master2">tcp://localhost:61716</connector> + <connector name="master3">tcp://localhost:61816</connector> + </connectors> + + + <!-- Acceptors --> + <acceptors> + <acceptor name="artemis">tcp://localhost:61916</acceptor> + </acceptors> + + <cluster-user>admin</cluster-user> + + <cluster-password>password</cluster-password> + + <cluster-connections> + <cluster-connection name="my-cluster"> + <connector-ref>artemis</connector-ref> + <message-load-balancing>OFF</message-load-balancing> + <max-hops>1</max-hops> + <static-connectors> + <connector-ref>master1</connector-ref> + <connector-ref>master2</connector-ref> + <connector-ref>master3</connector-ref> + </static-connectors> + </cluster-connection> + </cluster-connections> + + <!-- Other config --> + + <security-settings> + <!--security for example queue--> + <security-setting match="#"> + <permission type="createNonDurableQueue" roles="amq, guest"/> + <permission type="deleteNonDurableQueue" roles="amq, guest"/> + <permission type="createDurableQueue" roles="amq, guest"/> + <permission type="deleteDurableQueue" roles="amq, guest"/> + <permission type="createAddress" roles="amq, guest"/> + <permission type="deleteAddress" roles="amq, guest"/> + <permission type="consume" roles="amq, guest"/> + <permission type="browse" roles="amq, guest"/> + <permission type="send" roles="amq, guest"/> + <!-- we need this otherwise ./artemis data imp wouldn't work --> + <permission type="manage" roles="amq"/> + </security-setting> + </security-settings> + + <address-settings> + <!-- if you define auto-create on certain queues, management has to be auto-create --> + <address-setting match="activemq.management#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>-1</max-size-bytes> + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + <!--default for catch all--> + <address-setting match="#"> + <dead-letter-address>DLQ</dead-letter-address> + <expiry-address>ExpiryQueue</expiry-address> + <redelivery-delay>0</redelivery-delay> + <!-- with -1 only the global-max-size is in use for limiting --> + <max-size-bytes>10MB</max-size-bytes> + <page-size-bytes>1MB</page-size-bytes> + + <message-counter-history-day-limit>10</message-counter-history-day-limit> + <address-full-policy>PAGE</address-full-policy> + <auto-create-queues>true</auto-create-queues> + <auto-create-addresses>true</auto-create-addresses> + <auto-create-jms-queues>true</auto-create-jms-queues> + <auto-create-jms-topics>true</auto-create-jms-topics> + </address-setting> + </address-settings> + + <addresses> + <address name="exampleTopic"> + <multicast> + </multicast> + </address> + <address name="exampleQueue"> + <anycast> + <queue name="exampleQueue"/> + </anycast> + </address> + </addresses> + </core> +</configuration> diff --git a/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/management.xml b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/management.xml new file mode 100644 index 0000000..f9ae2f9 --- /dev/null +++ b/tests/smoke-tests/src/main/resources/servers/replicated-failback-slave1/management.xml @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8" standalone="yes"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<management-context xmlns="http://activemq.org/schema"> + <connector connector-port="10399" connector-host="localhost"/> +</management-context> \ No newline at end of file diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java index e6c2a6e..e35ffa5 100644 --- a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SmokeTestBase.java @@ -18,14 +18,15 @@ package org.apache.activemq.artemis.tests.smoke.common; import java.io.File; -import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.util.ServerUtil; import org.junit.After; public class SmokeTestBase extends ActiveMQTestBase { - ArrayList<Process> processes = new ArrayList(); + Set<Process> processes = new HashSet<>(); public static final String basedir = System.getProperty("basedir"); @@ -38,13 +39,23 @@ public class SmokeTestBase extends ActiveMQTestBase { e.printStackTrace(); } } + processes.clear(); } - public String getServerLocation(String serverName) { + public void killServer(Process process) { + processes.remove(process); + try { + ServerUtil.killServer(process); + } catch (Throwable e) { + e.printStackTrace(); + } + } + + public static String getServerLocation(String serverName) { return basedir + "/target/" + serverName; } - public void cleanupData(String serverName) { + public static void cleanupData(String serverName) { String location = getServerLocation(serverName); deleteDirectory(new File(location, "data")); } diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmxmultiplefailback/ReplicatedMultipleFailbackTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmxmultiplefailback/ReplicatedMultipleFailbackTest.java new file mode 100644 index 0000000..f354fe6 --- /dev/null +++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/jmxmultiplefailback/ReplicatedMultipleFailbackTest.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.smoke.jmxmultiplefailback; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonObject; +import javax.json.JsonReader; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import java.io.StringReader; +import java.net.MalformedURLException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ReplicatedMultipleFailbackTest extends SmokeTestBase { + + private static final Logger LOGGER = Logger.getLogger(ReplicatedMultipleFailbackTest.class); + + @FunctionalInterface + interface ThrowableFunction<T, R> { + + R apply(T t) throws Throwable; + } + + private static <C, T> Optional<T> queryControl(JMXServiceURL serviceURI, + ObjectName objectName, + ThrowableFunction<C, T> queryControl, + Class<C> controlClass, + Function<Throwable, T> onThrowable) { + try { + try (JMXConnector jmx = JMXConnectorFactory.connect(serviceURI)) { + final C control = MBeanServerInvocationHandler.newProxyInstance(jmx.getMBeanServerConnection(), objectName, controlClass, false); + return Optional.ofNullable(queryControl.apply(control)); + } + } catch (Throwable t) { + return Optional.ofNullable(onThrowable.apply(t)); + } + } + + private static Optional<Boolean> isBackup(JMXServiceURL serviceURI, ObjectNameBuilder builder) throws Exception { + return queryControl(serviceURI, builder.getActiveMQServerObjectName(), ActiveMQServerControl::isBackup, ActiveMQServerControl.class, throwable -> null); + } + + private static Optional<String> getNodeID(JMXServiceURL serviceURI, ObjectNameBuilder builder) throws Exception { + return queryControl(serviceURI, builder.getActiveMQServerObjectName(), ActiveMQServerControl::getNodeID, ActiveMQServerControl.class, throwable -> null); + } + + private static Optional<String> listNetworkTopology(JMXServiceURL serviceURI, + ObjectNameBuilder builder) throws Exception { + return queryControl(serviceURI, builder.getActiveMQServerObjectName(), ActiveMQServerControl::listNetworkTopology, ActiveMQServerControl.class, throwable -> null); + } + + private static Map<String, Pair<String, String>> decodeNetworkTopologyJson(String networkTopologyJson) { + if (networkTopologyJson == null || networkTopologyJson.isEmpty()) { + return Collections.emptyMap(); + } + try (JsonReader jsonReader = Json.createReader(new StringReader(networkTopologyJson))) { + final JsonArray nodeIDs = jsonReader.readArray(); + final int nodeCount = nodeIDs.size(); + Map<String, Pair<String, String>> networkTopology = new HashMap<>(nodeCount); + for (int i = 0; i < nodeCount; i++) { + final JsonObject nodePair = nodeIDs.getJsonObject(i); + final String nodeID = nodePair.getString("nodeID"); + final String live = nodePair.getString("live"); + final String backup = nodePair.getString("backup", null); + networkTopology.put(nodeID, new Pair<>(live, backup)); + } + return networkTopology; + } + } + + private static long countMembers(Map<String, Pair<String, String>> networkTopology) { + final long count = networkTopology.values().stream() + .map(Pair::getA).filter(live -> live != null && !live.isEmpty()) + .count(); + return count; + } + + private static long countNodes(Map<String, Pair<String, String>> networkTopology) { + final long count = networkTopology.values().stream() + .flatMap(pair -> Stream.of(pair.getA(), pair.getB())) + .filter(liveOrBackup -> liveOrBackup != null && !liveOrBackup.isEmpty()) + .count(); + return count; + } + + private static boolean validateNetworkTopology(String networkTopologyJson,Predicate<Map<String, Pair<String, String>>> checkTopology) { + final Map<String, Pair<String, String>> networkTopology = decodeNetworkTopologyJson(networkTopologyJson); + return checkTopology.test(networkTopology); + } + + private static String backupOf(String nodeID, Map<String, Pair<String, String>> networkTopology) { + return networkTopology.get(nodeID).getB(); + } + + private static String liveOf(String nodeID, Map<String, Pair<String, String>> networkTopology) { + return networkTopology.get(nodeID).getA(); + } + + private static Predicate<Map<String, Pair<String, String>>> containsExactNodeIds(String... nodeID) { + Objects.requireNonNull(nodeID); + return topology -> topology.size() == nodeID.length && Stream.of(nodeID).allMatch(topology::containsKey); + } + + private static Predicate<Map<String, Pair<String, String>>> withMembers(int count) { + return topology -> countMembers(topology) == count; + } + + private static Predicate<Map<String, Pair<String, String>>> withNodes(int count) { + return topology -> countNodes(topology) == count; + } + + private static Predicate<Map<String, Pair<String, String>>> withBackup(String nodeId, Predicate<String> compare) { + return topology -> compare.test(backupOf(nodeId, topology)); + } + + private static Predicate<Map<String, Pair<String, String>>> withLive(String nodeId, Predicate<String> compare) { + return topology -> compare.test(liveOf(nodeId, topology)); + } + + private static final String JMX_SERVER_HOSTNAME = "localhost"; + private static final int JMX_PORT_MASTER_1 = 10099; + private static final int JMX_PORT_MASTER_2 = 10199; + private static final int JMX_PORT_MASTER_3 = 10299; + private static final int JMX_PORT_SLAVE_1 = 10399; + + private static final String MASTER_1_DATA_FOLDER = "replicated-failback-master1"; + private static final String MASTER_2_DATA_FOLDER = "replicated-failback-master2"; + private static final String MASTER_3_DATA_FOLDER = "replicated-failback-master3"; + private static final String SLAVE_1_DATA_FOLDER = "replicated-failback-slave1"; + + private static final int MASTER_1_PORT_ID = 0; + private static final int MASTER_2_PORT_ID = MASTER_1_PORT_ID + 100; + private static final int MASTER_3_PORT_ID = MASTER_2_PORT_ID + 100; + private static final int SLAVE_1_PORT_ID = MASTER_3_PORT_ID + 100; + + private enum Broker { + master1(JMX_PORT_MASTER_1, MASTER_1_DATA_FOLDER, MASTER_1_PORT_ID), master2(JMX_PORT_MASTER_2, MASTER_2_DATA_FOLDER, MASTER_2_PORT_ID), master3(JMX_PORT_MASTER_3, MASTER_3_DATA_FOLDER, MASTER_3_PORT_ID), slave1(JMX_PORT_SLAVE_1, SLAVE_1_DATA_FOLDER, SLAVE_1_PORT_ID); + + final ObjectNameBuilder objectNameBuilder; + final String dataFolder; + final JMXServiceURL jmxServiceURL; + final int portID; + + Broker(int jmxPort, String dataFolder, int portID) { + this.portID = portID; + this.dataFolder = dataFolder; + try { + jmxServiceURL = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + jmxPort + "/jmxrmi"); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + this.objectNameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), name(), true); + } + + public Process startServer(SmokeTestBase env, int millisTimeout) throws Exception { + return env.startServer(dataFolder, portID, millisTimeout); + } + + public void cleanupData() { + ReplicatedMultipleFailbackTest.cleanupData(dataFolder); + } + + public Optional<Boolean> isBackup() throws Exception { + return ReplicatedMultipleFailbackTest.isBackup(jmxServiceURL, objectNameBuilder); + } + + public Optional<String> getNodeID() throws Exception { + return ReplicatedMultipleFailbackTest.getNodeID(jmxServiceURL, objectNameBuilder); + } + + public Optional<String> listNetworkTopology() throws Exception { + return ReplicatedMultipleFailbackTest.listNetworkTopology(jmxServiceURL, objectNameBuilder); + } + } + + @Before + public void before() { + Stream.of(Broker.values()).forEach(Broker::cleanupData); + disableCheckThread(); + } + + @Test + public void testMultipleFailback() throws Exception { + LOGGER.infof("TEST BOOTSTRAPPING START: STARTING brokers %s", Arrays.toString(Broker.values())); + final int failbackRetries = 10; + final int timeout = (int) TimeUnit.SECONDS.toMillis(30); + Process master1 = Broker.master1.startServer(this, timeout); + Wait.assertTrue(() -> !Broker.master1.isBackup().orElse(true), timeout); + Broker.master2.startServer(this, timeout); + Wait.assertTrue(() -> !Broker.master2.isBackup().orElse(true), timeout); + Broker.master3.startServer(this, timeout); + Wait.assertTrue(() -> !Broker.master3.isBackup().orElse(true), timeout); + Broker.slave1.startServer(this, 0); + Wait.assertTrue(() -> Broker.slave1.isBackup().orElse(false), timeout); + + final String nodeIDlive1 = Broker.master1.getNodeID().get(); + final String nodeIDlive2 = Broker.master2.getNodeID().get(); + final String nodeIDlive3 = Broker.master3.getNodeID().get(); + + for (Broker broker : Broker.values()) { + LOGGER.infof("CHECKING NETWORK TOPOLOGY FOR %s", broker); + Wait.assertTrue(() -> validateNetworkTopology(broker.listNetworkTopology().orElse(""), + containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3) + .and(withLive(nodeIDlive1, Objects::nonNull)) + .and(withBackup(nodeIDlive1, Objects::nonNull)) + .and(withMembers(3)) + .and(withNodes(4))), timeout); + } + + final String urlSlave1 = backupOf(nodeIDlive1, decodeNetworkTopologyJson(Broker.slave1.listNetworkTopology().get())); + Assert.assertNotNull(urlSlave1); + final String urlMaster1 = liveOf(nodeIDlive1, decodeNetworkTopologyJson(Broker.master1.listNetworkTopology().get())); + Assert.assertNotNull(urlMaster1); + Assert.assertNotEquals(urlMaster1, urlSlave1); + + LOGGER.infof("Node ID live 1 is %s", nodeIDlive1); + LOGGER.infof("Node ID live 2 is %s", nodeIDlive2); + LOGGER.infof("Node ID live 3 is %s", nodeIDlive3); + + LOGGER.infof("%s has url: %s", Broker.master1, urlMaster1); + LOGGER.infof("%s has url: %s", Broker.slave1, urlSlave1); + + LOGGER.info("BOOTSTRAPPING ENDED: READ nodeIds and master1/slave1 urls"); + + for (int i = 0; i < failbackRetries; i++) { + LOGGER.infof("START TEST %d", i + 1); + LOGGER.infof("KILLING master1"); + killServer(master1); + // wait until slave1 became live + Wait.assertTrue(() -> !Broker.slave1.isBackup().orElse(true), timeout); + LOGGER.info("slave1 is LIVE"); + LOGGER.info("VALIDATE TOPOLOGY OF ALIVE BROKERS"); + Stream.of(Broker.master2, Broker.master3, Broker.slave1).forEach( + broker -> Wait.assertTrue(() -> validateNetworkTopology(broker.listNetworkTopology().orElse(""), + containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3) + .and(withLive(nodeIDlive1, urlSlave1::equals)) + .and(withBackup(nodeIDlive1, Objects::isNull)) + .and(withMembers(3)) + .and(withNodes(3))), timeout) + ); + // restart master1 + LOGGER.info("STARTING master1"); + master1 = Broker.master1.startServer(this, 0); + Wait.assertTrue(() -> Broker.slave1.isBackup().orElse(false), timeout); + LOGGER.info("slave1 is BACKUP"); + Wait.assertTrue(() -> !Broker.master1.isBackup().orElse(true), timeout); + LOGGER.info("master1 is LIVE"); + for (Broker broker : Broker.values()) { + LOGGER.infof("CHECKING NETWORK TOPOLOGY FOR %s", broker); + Wait.assertTrue(() -> validateNetworkTopology(broker.listNetworkTopology().orElse(""), + containsExactNodeIds(nodeIDlive1, nodeIDlive2, nodeIDlive3) + .and(withLive(nodeIDlive1, urlMaster1::equals)) + .and(withBackup(nodeIDlive1, urlSlave1::equals)) + .and(withMembers(3)) + .and(withNodes(4))), timeout); + } + } + LOGGER.info("TEST COMPLETED"); + } +} +
