This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 537e0023fc ARTEMIS-4986 Compatibility issue on Quorum Voting
537e0023fc is described below
commit 537e0023fc786ad947dc30c5d11c779749d0cbad
Author: Clebert Suconic <[email protected]>
AuthorDate: Wed Aug 7 21:20:43 2024 -0400
ARTEMIS-4986 Compatibility issue on Quorum Voting
---
.../activemq/artemis/utils/SpawnedVMSupport.java | 19 +-
.../artemis/core/protocol/core/Channel.java | 10 +
.../core/protocol/core/impl/ChannelImpl.java | 28 +-
.../core/protocol/core/impl/PacketImpl.java | 2 +
.../core/impl/wireformat/QuorumVoteMessage.java | 4 +
.../artemis/core/server/ActiveMQServerLogger.java | 4 +-
.../core/server/cluster/quorum/QuorumManager.java | 77 ++++-
.../core/server/cluster/quorum/QuorumVote.java | 11 +-
.../cluster/quorum/QuorumVoteServerConnect.java | 7 +-
.../core/server/impl/ColocatedActivation.java | 3 +-
pom.xml | 2 +-
.../activemq/artemis/utils/RealServerTestBase.java | 6 +
.../artemis/utils/cli/helper/HelperBase.java | 31 +-
tests/compatibility-tests/pom.xml | 59 ++++
.../distribution/RollingUpgradeTest.java | 354 +++++++++++++++++++++
.../integration/cluster/util/BackupSyncDelay.java | 10 +-
16 files changed, 592 insertions(+), 35 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java
index 393682d03f..2cb8bbab2c 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/SpawnedVMSupport.java
@@ -221,17 +221,26 @@ public class SpawnedVMSupport {
Process process = builder.start();
+ spawnLoggers(wordMatch, wordRunning, className, logOutput,
logErrorOutput, process);
+
+ if (startedProcesses != null) {
+ startedProcesses.put(process, className);
+ }
+ return process;
+ }
+
+ public static void spawnLoggers(String wordMatch,
+ Runnable wordRunning,
+ String className,
+ boolean logOutput,
+ boolean logErrorOutput,
+ Process process) throws
ClassNotFoundException {
SpawnedVMSupport.startLogger(logOutput, wordMatch, wordRunning,
className, process);
// Adding a reader to System.err, so the VM won't hang on a
System.err.println as identified on this forum thread:
// http://www.jboss.org/index.html?module=bb&op=viewtopic&t=151815
ProcessLogger errorLogger = new ProcessLogger(logErrorOutput,
process.getErrorStream(), className, wordMatch, wordRunning);
errorLogger.start();
-
- if (startedProcesses != null) {
- startedProcesses.put(process, className);
- }
- return process;
}
/**
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
index 12817294c6..d8785952c5 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/Channel.java
@@ -57,6 +57,14 @@ public interface Channel {
*/
boolean supports(byte packetID, int version);
+ /*
+ * Due to ARTEMIS-4986, older versions (2.30.0 in particular) will require
a special voting handling,
+ * where we would perform specific retries at older values.
+ */
+ default boolean requireSpecialVotingHandling() {
+ return false;
+ }
+
/**
* Sends a packet on this channel.
*
@@ -134,6 +142,8 @@ public interface Channel {
*/
Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket)
throws ActiveMQException;
+ Packet sendBlocking(Packet packet, int reconnectID, byte expectedPacket,
long timeout, boolean failOnTimeout) throws ActiveMQException;
+
/**
* Sets the {@link
org.apache.activemq.artemis.core.protocol.core.ChannelHandler} that this
channel should
* forward received packets to.
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
index 4f8e17f5b0..e33d603ffa 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java
@@ -200,6 +200,11 @@ public final class ChannelImpl implements Channel {
}
}
+ @Override
+ public boolean requireSpecialVotingHandling() {
+ return connection.getChannelVersion() <
PacketImpl.ARTEMIS_2_36_0_VERSION;
+ }
+
@Override
public long getID() {
return id;
@@ -452,6 +457,15 @@ public final class ChannelImpl implements Channel {
return sendBlocking(packet, -1, expectedPacket);
}
+ @Override
+ public Packet sendBlocking(final Packet packet,
+ final int reconnectID,
+ final byte expectedPacket) throws
ActiveMQException {
+
+ return sendBlocking(packet, reconnectID, expectedPacket,
connection.getBlockingCallTimeout(), true);
+
+ }
+
/**
* Due to networking issues or server issues the server may take longer to
answer than expected.. the client may timeout the call throwing an exception
* and the client could eventually retry another call, but the server could
then answer a previous command issuing a class-cast-exception.
@@ -460,7 +474,9 @@ public final class ChannelImpl implements Channel {
@Override
public Packet sendBlocking(final Packet packet,
final int reconnectID,
- byte expectedPacket) throws ActiveMQException {
+ final byte expectedPacket,
+ final long timeout,
+ final boolean failOnTimeout) throws
ActiveMQException {
String interceptionResult = invokeInterceptors(packet, interceptors,
connection);
if (interceptionResult != null) {
@@ -478,7 +494,7 @@ public final class ChannelImpl implements Channel {
throw ActiveMQClientMessageBundle.BUNDLE.connectionDestroyed();
}
- if (connection.getBlockingCallTimeout() == -1) {
+ if (timeout == -1) {
if (logger.isTraceEnabled()) {
logger.trace("RemotingConnectionID={} Cannot do a blocking call
timeout on a server side connection", (connection == null ? "NULL" :
connection.getID()));
}
@@ -515,7 +531,7 @@ public final class ChannelImpl implements Channel {
connection.getTransportConnection().write(buffer, false, false);
- long toWait = connection.getBlockingCallTimeout();
+ long toWait = timeout;
long start = System.currentTimeMillis();
@@ -546,8 +562,12 @@ public final class ChannelImpl implements Channel {
throw ActiveMQClientMessageBundle.BUNDLE.unblockingACall(cause);
}
+ if (!failOnTimeout && response == null) {
+ return null;
+ }
+
if (response == null || (response.getType() !=
PacketImpl.EXCEPTION && response.getCorrelationID() !=
packet.getCorrelationID())) {
- ActiveMQException e =
ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(connection.getBlockingCallTimeout(),
packet.getType());
+ ActiveMQException e =
ActiveMQClientMessageBundle.BUNDLE.timedOutSendingPacket(timeout,
packet.getType());
connection.asyncFail(e);
throw e;
}
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
index 8b59358b62..137898b4c7 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java
@@ -53,6 +53,8 @@ public class PacketImpl implements Packet {
// 2.29.0
public static final int ARTEMIS_2_29_0_VERSION = 135;
+ public static final int ARTEMIS_2_36_0_VERSION = 136;
+
public static final SimpleString OLD_QUEUE_PREFIX =
SimpleString.of("jms.queue.");
public static final SimpleString OLD_TEMP_QUEUE_PREFIX =
SimpleString.of("jms.tempqueue.");
public static final SimpleString OLD_TOPIC_PREFIX =
SimpleString.of("jms.topic.");
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
index cedeae8262..3e31f583ca 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QuorumVoteMessage.java
@@ -60,6 +60,10 @@ public class QuorumVoteMessage extends PacketImpl {
return handler;
}
+ public void setHandler(SimpleString handler) {
+ this.handler = handler;
+ }
+
public Vote getVote() {
return vote;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 33e2887a12..5a88f63244 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1473,8 +1473,8 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224089, value = "Failed to calculate persistent size",
level = LogMessage.Level.WARN)
void errorCalculatePersistentSize(Throwable e);
- @LogMessage(id = 224090, value = "This node is not configured for Quorum
Voting, all nodes must be configured for HA", level = LogMessage.Level.WARN)
- void noVoteHandlerConfigured();
+ @LogMessage(id = 224090, value = "This node is not configured for the
proper Quorum Voting, all nodes must be configured for the same policy. Handler
received = {}", level = LogMessage.Level.WARN)
+ void noVoteHandlerConfigured(SimpleString handlerReceived);
@LogMessage(id = 224091, value = "Bridge {} is unable to connect to
destination. Retrying", level = LogMessage.Level.WARN)
void errorConnectingBridgeRetry(Bridge bridge);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumManager.java
index 6c1f1bdbf1..bb7deb5407 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server.cluster.quorum;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -41,6 +42,8 @@ import
org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.activemq.artemis.utils.Preconditions.checkNotNull;
@@ -51,6 +54,21 @@ import static
org.apache.activemq.artemis.utils.Preconditions.checkNotNull;
*/
public final class QuorumManager implements ClusterTopologyListener,
ActiveMQComponent {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final int VOTE_RESPONSE_TIMEOUT;
+
+ static {
+ int parsedTimeout = 2000; // default value
+ try {
+ parsedTimeout =
Integer.parseInt(System.getProperty(Quorum.class.getName() +
".VOTE_RESPONSE_TIMEOUT", "2000"));
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ VOTE_RESPONSE_TIMEOUT = parsedTimeout;
+ }
+
+
private final ExecutorService executor;
private final ClusterController clusterController;
@@ -276,6 +294,7 @@ public final class QuorumManager implements
ClusterTopologyListener, ActiveMQCom
* @return the updated vote
*/
private Vote vote(SimpleString handler, Vote vote) {
+ logger.trace("Vote for {}", handler);
QuorumVoteHandler quorumVoteHandler = handlers.get(handler);
return quorumVoteHandler.vote(vote);
}
@@ -310,21 +329,31 @@ public final class QuorumManager implements
ClusterTopologyListener, ActiveMQCom
}
private QuorumVoteHandler getVoteHandler(SimpleString handler) {
+ if (handler.equals(QuorumVoteServerConnect.OLD_PRIMARY_FAILOVER_VOTE)) {
+ handler = QuorumVoteServerConnect.PRIMARY_FAILOVER_VOTE;
+ }
return handlers.get(handler);
}
public void handleQuorumVote(Channel clusterChannel, Packet packet) {
QuorumVoteMessage quorumVoteMessage = (QuorumVoteMessage) packet;
+ SimpleString handlerUsed = quorumVoteMessage.getHandler();
+ if
(quorumVoteMessage.getHandler().equals(QuorumVoteServerConnect.OLD_PRIMARY_FAILOVER_VOTE))
{
+
quorumVoteMessage.setHandler(QuorumVoteServerConnect.PRIMARY_FAILOVER_VOTE);
+ }
+
QuorumVoteHandler voteHandler =
getVoteHandler(quorumVoteMessage.getHandler());
if (voteHandler == null) {
- ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured();
+ logger.trace("Could not find {}", quorumVoteMessage.getHandler());
+ ActiveMQServerLogger.LOGGER.noVoteHandlerConfigured(handlerUsed);
return;
}
quorumVoteMessage.decode(voteHandler);
ActiveMQServerLogger.LOGGER.receivedQuorumVoteRequest(quorumVoteMessage.getVote().toString());
+ logger.trace("Receiving handler = {}", quorumVoteMessage.getHandler());
Vote vote = vote(quorumVoteMessage.getHandler(),
quorumVoteMessage.getVote());
ActiveMQServerLogger.LOGGER.sendingQuorumVoteResponse(vote.toString());
- clusterChannel.send(new
QuorumVoteReplyMessage(quorumVoteMessage.getHandler(), vote));
+ clusterChannel.send(new QuorumVoteReplyMessage(handlerUsed, vote));
}
private final class VoteRunnableHolder {
@@ -348,23 +377,59 @@ public final class QuorumManager implements
ClusterTopologyListener, ActiveMQCom
}
}
- private Vote sendQuorumVote(ClusterControl clusterControl, SimpleString
handler, Vote vote) {
+ private Vote sendQuorumVote(ClusterControl clusterControl, SimpleString
handler, SimpleString oldHandlerName, Vote vote) {
try {
final ClientSessionFactoryInternal sessionFactory =
clusterControl.getSessionFactory();
final String remoteAddress =
sessionFactory.getConnection().getRemoteAddress();
ActiveMQServerLogger.LOGGER.sendingQuorumVoteRequest(remoteAddress,
vote.toString());
- QuorumVoteReplyMessage replyMessage = (QuorumVoteReplyMessage)
clusterControl.getClusterChannel().get()
- .sendBlocking(new QuorumVoteMessage(handler, vote),
PacketImpl.QUORUM_VOTE_REPLY);
+
+ QuorumVoteReplyMessage replyMessage = null;
+
+ Channel clusterChannel = clusterControl.getClusterChannel().get();
+
+ if (clusterChannel.requireSpecialVotingHandling()) {
+ logger.trace("Using special handling mechanism for Voting to cope
with previous version");
+ // if the wire versioning is from before 2.36.0, we try the vote
for former versions
+ replyMessage = voteFormerVersions(handler, oldHandlerName, vote,
clusterChannel);
+ } else {
+ logger.trace("Using REGULAR voting");
+ }
+
+ if (replyMessage == null) {
+ // if still no response, or if we are using a current version, we
will try the regular voting as usual
+ replyMessage = (QuorumVoteReplyMessage)
clusterChannel.sendBlocking(new QuorumVoteMessage(handler, vote),
PacketImpl.QUORUM_VOTE_REPLY);
+ }
+
+ logger.trace("Got reply message {}", replyMessage);
+
QuorumVoteHandler voteHandler =
getVoteHandler(replyMessage.getHandler());
replyMessage.decodeRest(voteHandler);
Vote voteResponse = replyMessage.getVote();
ActiveMQServerLogger.LOGGER.receivedQuorumVoteResponse(remoteAddress,
voteResponse.toString());
return voteResponse;
} catch (ActiveMQException e) {
+ logger.debug("{}", e.getMessage(), e);
return null;
}
}
+ private static QuorumVoteReplyMessage voteFormerVersions(SimpleString
handler,
+
SimpleString oldHandlerName,
+ Vote vote,
+ Channel
clusterChannel) throws ActiveMQException {
+ QuorumVoteReplyMessage replyMessage;
+ // We first try the current packet with a medium timeout
+ replyMessage = (QuorumVoteReplyMessage) clusterChannel.sendBlocking(new
QuorumVoteMessage(handler, vote), -1, PacketImpl.QUORUM_VOTE_REPLY,
VOTE_RESPONSE_TIMEOUT, false);
+ logger.trace("This is the reply message from the current version = {}",
replyMessage);
+
+ // if no response, we try the previous versions, with still a medium
timeout
+ if (replyMessage == null && oldHandlerName != null) {
+ replyMessage = (QuorumVoteReplyMessage)
clusterChannel.sendBlocking(new QuorumVoteMessage(oldHandlerName, vote), -1,
PacketImpl.QUORUM_VOTE_REPLY, VOTE_RESPONSE_TIMEOUT, false);
+ logger.trace("This is the reply message from the older version = {}",
replyMessage);
+ }
+ return replyMessage;
+ }
+
/**
* this will connect to a node and then cast a vote. whether or not this
vote is asked of the target node is dependent
* on {@link
org.apache.activemq.artemis.core.server.cluster.quorum.Vote#isRequestServerVote()}
@@ -394,7 +459,7 @@ public final class QuorumManager implements
ClusterTopologyListener, ActiveMQCom
vote = quorumVote.connected();
if (vote.isRequestServerVote()) {
- vote = sendQuorumVote(clusterControl, quorumVote.getName(),
vote);
+ vote = sendQuorumVote(clusterControl, quorumVote.getName(),
quorumVote.getOldName(), vote);
quorumVote.vote(vote);
} else {
quorumVote.vote(vote);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVote.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVote.java
index 0c974413b0..b553bea417 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVote.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVote.java
@@ -24,10 +24,13 @@ import
org.apache.activemq.artemis.core.client.impl.Topology;
*/
public abstract class QuorumVote<V extends Vote, T> {
- private SimpleString name;
+ private final SimpleString name;
- public QuorumVote(SimpleString name) {
+ private final SimpleString oldName;
+
+ public QuorumVote(SimpleString name, SimpleString oldName) {
this.name = name;
+ this.oldName = oldName;
}
/**
@@ -77,4 +80,8 @@ public abstract class QuorumVote<V extends Vote, T> {
public SimpleString getName() {
return name;
}
+
+ public SimpleString getOldName() {
+ return oldName;
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVoteServerConnect.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVoteServerConnect.java
index 7b1aa55c49..c009927869 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVoteServerConnect.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/quorum/QuorumVoteServerConnect.java
@@ -28,7 +28,12 @@ import
org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
*/
public class QuorumVoteServerConnect extends QuorumVote<ServerConnectVote,
Boolean> {
+
+ /** NOTE: The following String is used to identify the targetNode
implementation at other servers.
+ * Renaming such string would cause incompatibility changes. */
public static final SimpleString PRIMARY_FAILOVER_VOTE =
SimpleString.of("PrimaryFailoverQuorumVote");
+ public static final SimpleString OLD_PRIMARY_FAILOVER_VOTE =
SimpleString.of("LiveFailoverQuorumVote");
+
// this flag mark the end of the vote
private final CountDownLatch voteCompleted;
private final String targetNodeId;
@@ -49,7 +54,7 @@ public class QuorumVoteServerConnect extends
QuorumVote<ServerConnectVote, Boole
* n | n-1 | n/2 + 1 | n/2 + 1 rounded
*/
public QuorumVoteServerConnect(int size, String targetNodeId, boolean
requestToStayActive, String connector) {
- super(PRIMARY_FAILOVER_VOTE);
+ super(PRIMARY_FAILOVER_VOTE, OLD_PRIMARY_FAILOVER_VOTE);
this.targetNodeId = targetNodeId;
this.connector = connector;
double majority;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ColocatedActivation.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ColocatedActivation.java
index 381fc66bf6..79c5a04446 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ColocatedActivation.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ColocatedActivation.java
@@ -165,7 +165,8 @@ public class ColocatedActivation extends PrimaryActivation {
private final List<Pair<String, Integer>> nodes = new ArrayList<>();
private RequestBackupQuorumVote() {
- super(REQUEST_BACKUP_QUORUM_VOTE);
+ // there's no old name, we never renamed the ColocatedQuorumVote
String
+ super(REQUEST_BACKUP_QUORUM_VOTE, null);
}
@Override
diff --git a/pom.xml b/pom.xml
index afe5562e41..ef6c23c674 100644
--- a/pom.xml
+++ b/pom.xml
@@ -195,7 +195,7 @@
<activemq.version.majorVersion>1</activemq.version.majorVersion>
<activemq.version.minorVersion>0</activemq.version.minorVersion>
<activemq.version.microVersion>0</activemq.version.microVersion>
-
<activemq.version.incrementingVersion>135,134,133,132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
+
<activemq.version.incrementingVersion>136,135,134,133,132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
index 70fe9b2d4a..b7d2f46373 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/RealServerTestBase.java
@@ -37,6 +37,7 @@ import java.net.MalformedURLException;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -95,6 +96,11 @@ public class RealServerTestBase extends ActiveMQTestBase {
assertTrue(stopMe.createNewFile());
}
+ protected static void stopServerWithFile(String serverLocation, Process
process, int timeout, TimeUnit unit) throws Exception {
+ stopServerWithFile(serverLocation);
+ process.waitFor(timeout, unit);
+ }
+
public static String getServerLocation(String serverName) {
return basedir + "/target/" + serverName;
}
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperBase.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperBase.java
index 28c81333b8..8444ff5456 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperBase.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/cli/helper/HelperBase.java
@@ -33,20 +33,20 @@ public class HelperBase {
File artemisInstance;
HelperBase(String homeProperty) {
- String propertyHome = System.getProperty(homeProperty);
- if (propertyHome == null) {
- throw new IllegalArgumentException("System property " + propertyHome
+ " not defined");
- }
- if (propertyHome != null) {
- artemisHome = new File(propertyHome);
- }
+ setArtemisHome(getHome(homeProperty));
logger.debug("using artemisHome as {}", artemisHome);
- if (!artemisHome.exists()) {
- throw new IllegalArgumentException(artemisHome + " folder does not
exist in the file system");
- }
- if (!new File(artemisHome, "/bin").exists() || !new File(artemisHome,
"/bin/artemis").exists()) {
- throw new IllegalArgumentException("invalid bin folder");
+ }
+
+ public static File getHome() {
+ return getHome(ARTEMIS_HOME_PROPERTY);
+ }
+
+ public static File getHome(String homeProperty) {
+ String valueHome = System.getProperty(homeProperty);
+ if (valueHome == null) {
+ throw new IllegalArgumentException("System property " + valueHome + "
not defined");
}
+ return new File(valueHome);
}
public File getArtemisHome() {
@@ -55,6 +55,13 @@ public class HelperBase {
public HelperBase setArtemisHome(File artemisHome) {
this.artemisHome = artemisHome;
+ if (!artemisHome.exists()) {
+ throw new IllegalArgumentException(artemisHome + " folder does not
exist in the file system");
+ }
+ if (!new File(artemisHome, "/bin").exists() || !new File(artemisHome,
"/bin/artemis").exists()) {
+ throw new IllegalArgumentException("invalid bin folder");
+ }
+
return this;
}
diff --git a/tests/compatibility-tests/pom.xml
b/tests/compatibility-tests/pom.xml
index 0302c4a2e3..b5cfb1b199 100644
--- a/tests/compatibility-tests/pom.xml
+++ b/tests/compatibility-tests/pom.xml
@@ -105,6 +105,12 @@
<artifactId>artemis-unit-test-support</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.activemq.tests</groupId>
+ <artifactId>artemis-test-support</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-features</artifactId>
@@ -265,6 +271,59 @@
</plugins>
</build>
<profiles>
+ <profile>
+ <id>compatibility-tests-distribution</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>unpack-2.30.0</id>
+ <phase>package</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apache-artemis</artifactId>
+ <version>2.30.0</version>
+ <classifier>bin</classifier>
+ <type>zip</type>
+
<outputDirectory>${basedir}/target/old-releases</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ <!-- TODO: Switch this to 2.37.0 as soon as we release it.
+ It will be important to test the switch at the
current and previous version. -->
+ <execution>
+ <id>unpack-2.36.0</id>
+ <phase>package</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>apache-artemis</artifactId>
+ <version>2.36.0</version>
+ <classifier>bin</classifier>
+ <type>zip</type>
+
<outputDirectory>${basedir}/target/old-releases</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
<profile>
<id>compatibility-tests</id>
<properties>
diff --git
a/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/distribution/RollingUpgradeTest.java
b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/distribution/RollingUpgradeTest.java
new file mode 100644
index 0000000000..cdf4b39a8c
--- /dev/null
+++
b/tests/compatibility-tests/src/test/java/org/apache/activemq/artemis/tests/compatibility/distribution/RollingUpgradeTest.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.compatibility.distribution;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.management.SimpleManagement;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.FileUtil;
+import org.apache.activemq.artemis.utils.RealServerTestBase;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.cli.helper.HelperBase;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+public class RollingUpgradeTest extends RealServerTestBase {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static final String TWO_THIRTY =
"./target/old-releases/apache-artemis-2.30.0";
+
+ // TODO: Upgrade this towards 2.37 as soon as we release it
+ private static final String TWO_THIRTY_SIX =
"./target/old-releases/apache-artemis-2.36.0";
+
+ private static final String QUEUE_NAME = "RollQueue";
+
+ public static final String LIVE_0 = "RolledUpgradeTest/live0";
+ public static final String LIVE_1 = "RolledUpgradeTest/live1";
+ public static final String LIVE_2 = "RolledUpgradeTest/live2";
+
+ public static final String BKP_0 = "RolledUpgradeTest/bkp0";
+ public static final String BKP_1 = "RolledUpgradeTest/bkp1";
+ public static final String BKP_2 = "RolledUpgradeTest/bkp2";
+
+ private static final String LIVE0_URI = "tcp://localhost:61616";
+ private static final String LIVE1_URI = "tcp://localhost:61617";
+ private static final String LIVE2_URI = "tcp://localhost:61618";
+ private static final String BACKUP0_URI = "tcp://localhost:61619";
+ private static final String BACKUP1_URI = "tcp://localhost:61620";
+ private static final String BACKUP2_URI = "tcp://localhost:61621";
+
+ Process live0Process;
+ Process live1Process;
+ Process live2Process;
+
+ Process bkp0Process;
+ Process bkp1Process;
+ Process bkp2Process;
+
+ private static String getHost(int nodeID) {
+ return "localhost";
+ }
+
+ private static int getPort(int nodeID) {
+ return 61616 + nodeID;
+ }
+
+ private static int getPortOffeset(int nodeID) {
+ return nodeID;
+ }
+
+ private static void createServer(File homeLocation,
+ String location,
+ int serverID,
+ String replicaGroupName,
+ boolean live,
+ int[] connectedNodes) throws Exception {
+ File serverLocation = getFileServerLocation(location);
+ List<String> parameters = new ArrayList<>();
+ deleteDirectory(serverLocation);
+
+ StringBuilder clusterList = new StringBuilder();
+ for (int i = 0; i < connectedNodes.length; i++) {
+ if (i > 0) {
+ clusterList.append(",");
+ }
+ clusterList.append("tcp://" + getHost(connectedNodes[i]) + ":" +
getPort(connectedNodes[i]));
+ }
+
+ parameters.add(homeLocation.getAbsolutePath() + "/bin/artemis");
+ parameters.add("create");
+ parameters.add("--silent");
+ parameters.add("--queues");
+ parameters.add(QUEUE_NAME);
+ parameters.add("--user");
+ parameters.add("guest");
+ parameters.add("--password");
+ parameters.add("guest");
+ parameters.add("--port-offset");
+ parameters.add(String.valueOf(getPortOffeset(serverID)));
+ parameters.add("--allow-anonymous");
+ parameters.add("--no-web");
+ parameters.add("--no-autotune");
+ parameters.add("--host");
+ parameters.add("localhost");
+ parameters.add("--clustered");
+ parameters.add("--staticCluster");
+ parameters.add(clusterList.toString());
+ parameters.add("--replicated");
+ if (!live) {
+ parameters.add("--slave");
+ }
+ parameters.add("--no-amqp-acceptor");
+ parameters.add("--no-mqtt-acceptor");
+ parameters.add("--no-hornetq-acceptor");
+ parameters.add("--no-stomp-acceptor");
+ parameters.add(serverLocation.getAbsolutePath());
+
+ ProcessBuilder processBuilder = new ProcessBuilder();
+ processBuilder.command(parameters.toArray(new
String[parameters.size()]));
+ Process process = processBuilder.start();
+ SpawnedVMSupport.spawnLoggers(null, null, "ArtemisCreate", true, true,
process);
+ assertTrue(process.waitFor(10, TimeUnit.SECONDS));
+
+ File brokerXml = new File(serverLocation, "/etc/broker.xml");
+
+ if (live) {
+ boolean replacedMaster = FileUtil.findReplace(brokerXml,
"</primary>", " <group-name>" + replicaGroupName + "</group-name>\n" + "
<check-for-live-server>true</check-for-live-server>\n" + "
<quorum-size>2</quorum-size>\n" + " </primary>");
+ replacedMaster |= FileUtil.findReplace(brokerXml, "</master>", "
<group-name>" + replicaGroupName + "</group-name>\n" + "
<check-for-live-server>true</check-for-live-server>\n" + "
<quorum-size>2</quorum-size>\n" + " </master>");
+
+ assertTrue(replacedMaster, "couldn't find either master or primary on
broker.xml");
+ } else {
+ boolean replacedSlave = FileUtil.findReplace(brokerXml, "<slave/>",
"<slave>\n" + " <group-name>" + replicaGroupName +
"</group-name>\n" + " <allow-failback>false</allow-failback>\n" +
" <quorum-size>2</quorum-size>\n" + " </slave>");
+
+ replacedSlave |= FileUtil.findReplace(brokerXml, "<backup/>",
"<backup>\n" + " <group-name>" + replicaGroupName +
"</group-name>\n" + " <allow-failback>false</allow-failback>\n" +
" <quorum-size>2</quorum-size>\n" + " </backup>");
+ assertTrue(replacedSlave, "couldn't find slave on backup to replace
on broker.xml");
+ }
+
+ }
+
+ public static void createServers(File sourceHome) throws Exception {
+ createServer(sourceHome, LIVE_0, 0, "live0", true, new int[]{1, 2, 3, 4,
5});
+ createServer(sourceHome, LIVE_1, 1, "live1", true, new int[]{0, 2, 3, 4,
5});
+ createServer(sourceHome, LIVE_2, 2, "live2", true, new int[]{0, 1, 3, 4,
5});
+ createServer(sourceHome, BKP_0, 3, "live0", false, new int[]{0, 1, 2, 4,
5});
+ createServer(sourceHome, BKP_1, 4, "live1", false, new int[]{0, 1, 2, 3,
5});
+ createServer(sourceHome, BKP_2, 5, "live2", false, new int[]{0, 1, 2, 3,
4});
+ }
+
+ private void upgrade(File home, File instance) throws Exception {
+ ProcessBuilder upgradeBuilder = new ProcessBuilder();
+ upgradeBuilder.command(home.getAbsolutePath() + "/bin/artemis",
"upgrade", instance.getAbsolutePath());
+ Process process = upgradeBuilder.start();
+ SpawnedVMSupport.spawnLoggers(null, null, null, true, true, process);
+ assertTrue(process.waitFor(10, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testRollUpgrade_2_30() throws Exception {
+ testRollUpgrade(new File(TWO_THIRTY));
+ }
+
+ @Test
+ public void testRollUpgrade_2_36() throws Exception {
+ testRollUpgrade(new File(TWO_THIRTY_SIX));
+ }
+
+ private void testRollUpgrade(File artemisHome) throws Exception {
+ assumeTrue(artemisHome.exists());
+ createServers(artemisHome);
+
+ live0Process = startServer(LIVE_0, 0, -1);
+ live1Process = startServer(LIVE_1, 1, -1);
+ live2Process = startServer(LIVE_2, 2, -1);
+
+ ServerUtil.waitForServerToStart(0, 30_000);
+ ServerUtil.waitForServerToStart(1, 30_000);
+ ServerUtil.waitForServerToStart(2, 30_000);
+
+ SimpleManagement managementLive0 = new SimpleManagement(LIVE0_URI, null,
null);
+ SimpleManagement managementLive1 = new SimpleManagement(LIVE1_URI, null,
null);
+ SimpleManagement managementLive2 = new SimpleManagement(LIVE2_URI, null,
null);
+ runAfter(managementLive0::close);
+ runAfter(managementLive1::close);
+ runAfter(managementLive2::close);
+
+ SimpleManagement managementBKP0 = new SimpleManagement(BACKUP0_URI,
null, null);
+ SimpleManagement managementBKP1 = new SimpleManagement(BACKUP1_URI,
null, null);
+ SimpleManagement managementBKP2 = new SimpleManagement(BACKUP2_URI,
null, null);
+ runAfter(managementBKP0::close);
+ runAfter(managementBKP1::close);
+ runAfter(managementBKP2::close);
+
+ bkp0Process = startServer(BKP_0, 0, 0);
+ bkp1Process = startServer(BKP_1, 0, 0);
+ bkp2Process = startServer(BKP_2, 0, 0);
+
+ Wait.assertTrue(managementLive0::isReplicaSync, 10_000, 100);
+ Wait.assertTrue(managementLive1::isReplicaSync, 10_000, 100);
+ Wait.assertTrue(managementLive2::isReplicaSync, 10_000, 100);
+
+ sendMessage(LIVE0_URI, "AMQP");
+ sendMessage(LIVE0_URI, "CORE");
+ sendMessage(LIVE0_URI, "OPENWIRE");
+
+ sendMessage(LIVE1_URI, "AMQP");
+ sendMessage(LIVE1_URI, "CORE");
+ sendMessage(LIVE1_URI, "OPENWIRE");
+
+ sendMessage(LIVE2_URI, "AMQP");
+ sendMessage(LIVE2_URI, "CORE");
+ sendMessage(LIVE2_URI, "OPENWIRE");
+
+ Pair<Process, Process> updatedProcess;
+
+ updatedProcess = rollUpgrade(0, LIVE_0, managementLive0, live0Process,
3, BKP_0, managementBKP0, bkp0Process);
+ this.live0Process = updatedProcess.getA();
+ this.bkp0Process = updatedProcess.getB();
+
+ updatedProcess = rollUpgrade(1, LIVE_1, managementLive1, live1Process,
4, BKP_1, managementBKP1, bkp1Process);
+ this.live1Process = updatedProcess.getA();
+ this.bkp1Process = updatedProcess.getB();
+
+ updatedProcess = rollUpgrade(2, LIVE_2, managementLive2, live2Process,
5, BKP_2, managementBKP2, bkp2Process);
+ this.live2Process = updatedProcess.getA();
+ this.bkp2Process = updatedProcess.getB();
+
+ consumeMessage(LIVE0_URI, "AMQP");
+ consumeMessage(LIVE0_URI, "CORE");
+ consumeMessage(LIVE0_URI, "OPENWIRE");
+
+ consumeMessage(LIVE1_URI, "AMQP");
+ consumeMessage(LIVE1_URI, "CORE");
+ consumeMessage(LIVE1_URI, "OPENWIRE");
+
+ consumeMessage(LIVE2_URI, "AMQP");
+ consumeMessage(LIVE2_URI, "CORE");
+ consumeMessage(LIVE2_URI, "OPENWIRE");
+
+ }
+
+ private void sendMessage(String uri, String protocol) throws Exception {
+ ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, uri);
+ try (Connection connection = cf.createConnection();
+ Session session =
connection.createSession(false,Session.AUTO_ACKNOWLEDGE)) {
+ MessageProducer producer =
session.createProducer(session.createQueue(QUEUE_NAME));
+ TextMessage message = session.createTextMessage("hello from protocol
" + protocol);
+ message.setStringProperty("protocolUsed", protocol);
+ logger.info("sending message {}", message.getText());
+ producer.send(message);
+ producer.close();
+ }
+ }
+
+ private void consumeMessage(String uri, String protocol) throws Exception {
+ ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, uri);
+ try (Connection connection = cf.createConnection();
+ Session session =
connection.createSession(false,Session.AUTO_ACKNOWLEDGE)) {
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(QUEUE_NAME));
+ connection.start();
+ TextMessage message = (TextMessage) consumer.receive(5_000);
+ assertNotNull(message);
+ logger.info("receiving message {}", message.getText());
+ assertEquals(protocol, message.getStringProperty("protocolUsed"));
+ consumer.close();
+ }
+ }
+
+ private Pair<Process, Process> rollUpgrade(int liveID,
+ String liveServerToStop,
+ SimpleManagement liveManagement,
+ Process liveProcess,
+ int backupID,
+ String backupToStop,
+ SimpleManagement
backupManagement,
+ Process backupProcess) throws
Exception {
+
+ logger.info("Stopping server {}", liveServerToStop);
+ stopServerWithFile(getServerLocation(liveServerToStop), liveProcess, 10,
TimeUnit.SECONDS);
+
+ // waiting backup to activate after the live stop
+ logger.info("Waiting backup {} to become live", backupID);
+ ServerUtil.waitForServerToStart(backupID, 30_000);
+
+ logger.info("Upgrading {}", liveServerToStop);
+ upgrade(HelperBase.getHome(), getFileServerLocation(liveServerToStop));
+
+ logger.info("Starting server {}", liveServerToStop);
+ Process newLiveProcess = startServer(liveServerToStop, 0, 0);
+
+ logger.info("Waiting replica to be sync");
+ Wait.assertTrue(() ->
repeatCallUntilConnected(backupManagement::isReplicaSync), 10_000, 100);
+
+ logger.info("Stopping backup {}", backupToStop);
+ stopServerWithFile(getServerLocation(backupToStop), backupProcess, 10,
TimeUnit.SECONDS);
+
+ logger.info("Waiting Activation on serverID {}", liveID);
+ // waiting former live to activate after stopping backup
+ ServerUtil.waitForServerToStart(liveID, 30_000);
+
+ logger.info("upgrading {}", backupToStop);
+ upgrade(HelperBase.getHome(), getFileServerLocation(backupToStop));
+
+ Process newBackupProcess = startServer(backupToStop, 0, 0);
+
+ logger.info("Waiting live to have its replica sync");
+ Wait.assertTrue(() ->
repeatCallUntilConnected(liveManagement::isReplicaSync), 10_000, 100);
+
+ return new Pair<>(newLiveProcess, newBackupProcess);
+ }
+
+ boolean repeatCallUntilConnected(Wait.Condition condition) {
+ Throwable lastException = null;
+ long timeLimit = System.currentTimeMillis() + 10_000;
+ do {
+ try {
+ return condition.isSatisfied();
+ } catch (Throwable e) {
+ lastException = e;
+ }
+ }
+ while (System.currentTimeMillis() > timeLimit);
+
+ if (lastException != null) {
+ throw new RuntimeException(lastException.getMessage(), lastException);
+ } else {
+ // if it gets here I'm a bad programmer!
+ throw new IllegalStateException("didn't complete for some reason");
+ }
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
index 287d730efc..7d88f7a458 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java
@@ -249,6 +249,15 @@ public class BackupSyncDelay implements Interceptor {
throw new UnsupportedOperationException();
}
+ @Override
+ public Packet sendBlocking(Packet packet,
+ int reconnectID,
+ byte expectedPacket,
+ long timeout,
+ boolean failOnTimeout) throws
ActiveMQException {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void setHandler(ChannelHandler handler) {
throw new UnsupportedOperationException();
@@ -272,7 +281,6 @@ public class BackupSyncDelay implements Interceptor {
@Override
public void transferConnection(CoreRemotingConnection newConnection) {
throw new UnsupportedOperationException();
-
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact