[
https://issues.apache.org/jira/browse/ARTEMIS-3340?focusedWorklogId=627630&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-627630
]
ASF GitHub Bot logged work on ARTEMIS-3340:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 26/Jul/21 10:19
Start Date: 26/Jul/21 10:19
Worklog Time Spent: 10m
Work Description: franz1981 commented on a change in pull request #3646:
URL: https://github.com/apache/activemq-artemis/pull/3646#discussion_r676187972
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java
##########
@@ -0,0 +1,692 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import
org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.MutableLong;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.jboss.logging.Logger;
+
+import static
org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure;
+
+/**
+ * This activation can be used by a primary while trying to fail-back ie
{@code failback == true} or
+ * by a natural-born backup ie {@code failback == false}.<br>
+ */
+public final class ReplicationBackupActivation extends Activation implements
DistributedPrimitiveManager.UnavailableManagerListener {
+
+ private static final Logger LOGGER =
Logger.getLogger(ReplicationBackupActivation.class);
+
+ private final boolean wasLive;
+ private final ReplicationBackupPolicy policy;
+ private final ActiveMQServerImpl activeMQServer;
+ // This field is != null iff this node is a primary during a fail-back ie
acting as a backup in order to become live again.
+ private final String expectedNodeID;
+ @GuardedBy("this")
+ private boolean closed;
+ private final DistributedPrimitiveManager distributedManager;
+ // Used for monitoring purposes
+ private volatile ReplicationObserver replicationObserver;
+ // Used for testing purposes
+ private volatile ReplicationEndpoint replicationEndpoint;
+ // Used for testing purposes
+ private Consumer<ReplicationEndpoint> onReplicationEndpointCreation;
+ // Used to arbiter one-shot server stop/restart
+ private final AtomicBoolean stopping;
+
+ public ReplicationBackupActivation(final ActiveMQServerImpl activeMQServer,
+ final boolean wasLive,
+ final DistributedPrimitiveManager
distributedManager,
+ final ReplicationBackupPolicy policy) {
+ this.wasLive = wasLive;
+ this.activeMQServer = activeMQServer;
+ if (policy.isTryFailback()) {
+ final SimpleString serverNodeID = activeMQServer.getNodeID();
+ if (serverNodeID == null || serverNodeID.isEmpty()) {
+ throw new IllegalStateException("A failback activation must be
biased around a specific NodeID");
+ }
+ this.expectedNodeID = serverNodeID.toString();
+ } else {
+ this.expectedNodeID = null;
+ }
+ this.distributedManager = distributedManager;
+ this.policy = policy;
+ this.replicationObserver = null;
+ this.replicationEndpoint = null;
+ this.stopping = new AtomicBoolean(false);
+ }
+
+ /**
+ * used for testing purposes.
+ */
+ public DistributedPrimitiveManager getDistributedManager() {
+ return distributedManager;
+ }
+
+ @Override
+ public void onUnavailableManagerEvent() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ }
+ LOGGER.info("Unavailable quorum service detected: try restart server");
+ asyncRestartServer(activeMQServer, true);
+ }
+
+ /**
+ * This util class exists because {@link LiveNodeLocator} need a {@link
LiveNodeLocator.BackupRegistrationListener}
+ * to forward backup registration failure events: this is used to switch
on/off backup registration event listening
+ * on an existing locator.
+ */
+ private static final class RegistrationFailureForwarder implements
LiveNodeLocator.BackupRegistrationListener, AutoCloseable {
+
+ private static final LiveNodeLocator.BackupRegistrationListener
NOOP_LISTENER = ignore -> {
+ };
+ private volatile LiveNodeLocator.BackupRegistrationListener listener =
NOOP_LISTENER;
+
+ public RegistrationFailureForwarder
to(LiveNodeLocator.BackupRegistrationListener listener) {
+ this.listener = listener;
+ return this;
+ }
+
+ @Override
+ public void onBackupRegistrationFailed(boolean alreadyReplicating) {
+ listener.onBackupRegistrationFailed(alreadyReplicating);
+ }
+
+ @Override
+ public void close() {
+ listener = NOOP_LISTENER;
+ }
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ }
+ try {
+ LOGGER.infof("Trying to reach the majority of quorum nodes");
+ distributedManager.start();
+ LOGGER.debug("Quorum service available");
+
+ if (policy.isTryFailback()) {
+ // we are replicating to overwrite our data, transient backup
state while trying to be the primary
+ } else {
+ // we may be a valid insync_replica (backup) if our activation
sequence is largest for a nodeId
+ // verify that before removing data..
+ final DistributedLock liveLockWithInSyncReplica =
checkForInSyncReplica();
+ if (liveLockWithInSyncReplica != null) {
+ // retain state and start as live
+ if (!activeMQServer.initialisePart1(false)) {
+ return;
+ }
+
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
+ startAsLive(liveLockWithInSyncReplica);
+ return;
+ }
+ }
+ distributedManager.addUnavailableManagerListener(this);
+ // Stop the previous node manager and create a new one with
NodeManager::replicatedBackup == true:
+ // NodeManager::start skip setup lock file with NodeID, until
NodeManager::stopBackup is called.
+ activeMQServer.resetNodeManager();
+ // A primary need to preserve NodeID across runs
+
activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(),
policy.isTryFailback());
+ activeMQServer.getNodeManager().start();
+ if (!activeMQServer.initialisePart1(false)) {
+ return;
+ }
+ synchronized (this) {
+ if (closed)
+ return;
+ }
+
+
+ final ClusterController clusterController =
activeMQServer.getClusterManager().getClusterController();
+
+ LOGGER.infof("Apache ActiveMQ Artemis Backup Server version %s [%s]
started, awaiting connection to a live cluster member to start replication",
activeMQServer.getVersion().getFullVersion(),
+ activeMQServer.toString());
+
+ clusterController.awaitConnectionToReplicationCluster();
+ activeMQServer.getBackupManager().start();
+ activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
+ final DistributedLock liveLock =
replicateAndFailover(clusterController);
+ if (liveLock == null) {
+ return;
+ }
+ startAsLive(liveLock);
+ } catch (Exception e) {
+ if ((e instanceof InterruptedException || e instanceof
IllegalStateException) && !activeMQServer.isStarted()) {
+ // do not log these errors if the server is being stopped.
+ return;
+ }
+ ActiveMQServerLogger.LOGGER.initializationError(e);
+ }
+ }
+
+ private DistributedLock checkForInSyncReplica() throws
InterruptedException, ExecutionException, TimeoutException,
UnavailableStateException {
+ final long nodeActivationSequence =
activeMQServer.getNodeManager().readNodeActivationSequence();
+ if (nodeActivationSequence > 0) {
+ // not an empty backup (first start), let see if we can get the lock
and verify our data activation sequence
+ final String lockAndLongId =
activeMQServer.getNodeManager().getNodeId().toString();
+ final DistributedLock activationLock =
distributedManager.getDistributedLock(lockAndLongId);
+ try (MutableLong coordinatedNodeSequence =
distributedManager.getMutableLong(lockAndLongId)) {
+ while (true) {
+ // dirty read is sufficient to know if we are *not* an in sync
replica
+ // typically the lock owner will increment to signal our data
is stale and we are happy without any
+ // further coordination at this point
+ final long currentCoordinatedNodeSequence =
coordinatedNodeSequence.get();
+ if (nodeActivationSequence != currentCoordinatedNodeSequence) {
+ LOGGER.infof("Not a candidate for NodeID = %s activation,
local activation sequence %d does not match coordinated activation sequence
%d", lockAndLongId, nodeActivationSequence, currentCoordinatedNodeSequence);
+ activationLock.close();
+ return null;
+ }
+ // our data may be current, start coordinating to verify
+ if (!activationLock.tryLock(policy.getVoteRetryWait(),
TimeUnit.MILLISECONDS)) {
+ LOGGER.debugf("Candidate for Node ID = %s, with local
activation sequence: %d, cannot acquire live lock within %dms; retrying",
lockAndLongId, nodeActivationSequence, policy.getVoteRetryWait());
+ continue;
+ }
+ final long lockedCoordinatedNodeSequence =
coordinatedNodeSequence.get();
+ if (nodeActivationSequence != lockedCoordinatedNodeSequence) {
+ LOGGER.infof("Not a candidate for NodeID = %s activation,
local activation sequence %d does not match current coordinated activation
sequence %d", lockAndLongId, nodeActivationSequence,
lockedCoordinatedNodeSequence);
+ activationLock.close();
+ return null;
+ }
+ // we are an in_sync_replica, good to go live as UNREPLICATED
+ LOGGER.infof("Assuming live role for NodeID = %s, local
activation sequence %d matches current coordinated activation sequence %d",
lockAndLongId, nodeActivationSequence, lockedCoordinatedNodeSequence);
+ return activationLock;
+ }
+ }
+ } else {
+ LOGGER.debugf("Activation sequence is 0");
+ return null;
+ }
+ }
+
+ static void ensureSequentialAccessToNodeData(ActiveMQServer activeMQServer,
+ DistributedPrimitiveManager
distributedPrimitiveManager, final Logger logger) throws ActiveMQException,
InterruptedException, UnavailableStateException, ExecutionException,
TimeoutException {
+
+ final NodeManager nodeManager = activeMQServer.getNodeManager();
+ final String lockAndLongId = nodeManager.getNodeId().toString();
+ final long nodeActivationSequence =
nodeManager.readNodeActivationSequence();
+
+ final DistributedLock liveLock =
distributedPrimitiveManager.getDistributedLock(lockAndLongId);
+ if (liveLock.isHeldByCaller()) {
+ MutableLong coordinatedNodeActivationSequence =
distributedPrimitiveManager.getMutableLong(lockAndLongId);
+ if (nodeActivationSequence !=
coordinatedNodeActivationSequence.get()) {
+ final String message = String.format("Server [%s], cannot assume
live role for NodeID = %s, local activation sequence %d does not match current
coordinated sequence %d", activeMQServer, lockAndLongId,
nodeActivationSequence, coordinatedNodeActivationSequence.get());
+ logger.info(message);
+ throw new ActiveMQException(message);
+ }
+
+ // UN_REPLICATED STATE ENTER
+ if
(coordinatedNodeActivationSequence.compareAndSet(nodeActivationSequence,
nodeActivationSequence + 1)) {
+ nodeManager.writeNodeActivationSequence(nodeActivationSequence +
1);
+ logger.infof("Server [%s], incremented coordinated activation
sequence to: %d for NodeId = %s", activeMQServer, nodeActivationSequence + 1,
lockAndLongId);
+ } else {
+ final String message = String.format("Server [%s], cannot assume
live role for NodeID = %s, compareAndSet failed, local activation sequence %d
no longer matches current coordinated sequence %d", activeMQServer,
lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get());
+ logger.info(message);
+ throw new ActiveMQException(message);
+ }
+ } else {
+ final String message = String.format("Server [%s], live lock for
NodeID = %s, not held, activation sequence cannot be safely incremented to %d",
activeMQServer, lockAndLongId, nodeActivationSequence);
+ logger.info(message);
+ throw new UnavailableStateException(message);
+ }
+ }
+
+ private void startAsLive(final DistributedLock liveLock) throws Exception {
+ activeMQServer.setHAPolicy(policy.getLivePolicy());
+
+ synchronized (activeMQServer) {
+ if (!activeMQServer.isStarted()) {
+ liveLock.close();
+ return;
+ }
+ ensureSequentialAccessToNodeData(activeMQServer, distributedManager,
LOGGER);
+
+ ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
+ // stopBackup is going to write the NodeID previously set on the
NodeManager,
+ // because activeMQServer.resetNodeManager() has created a
NodeManager with replicatedBackup == true.
+ activeMQServer.getNodeManager().stopBackup();
+ activeMQServer.getStorageManager().start();
+ activeMQServer.getBackupManager().activated();
+ // IMPORTANT:
+ // we're setting this activation JUST because it would allow the
server to use its
+ // getActivationChannelHandler to handle replication
+ final ReplicationPrimaryActivation primaryActivation = new
ReplicationPrimaryActivation(activeMQServer, distributedManager,
policy.getLivePolicy());
+ liveLock.addListener(primaryActivation);
+ activeMQServer.setActivation(primaryActivation);
+ activeMQServer.initialisePart2(false);
+ // calling primaryActivation.stateChanged !isHelByCaller is necessary
in case the lock was unavailable
+ // before liveLock.addListener: just throwing an exception won't stop
the broker.
+ final boolean stillLive;
+ try {
+ stillLive = liveLock.isHeldByCaller();
+ } catch (UnavailableStateException e) {
+ LOGGER.warn(e);
+ primaryActivation.onUnavailableLockEvent();
+ throw new ActiveMQIllegalStateException("This server cannot check
its role as a live: activation is failed");
+ }
+ if (!stillLive) {
+ primaryActivation.onUnavailableLockEvent();
+ throw new ActiveMQIllegalStateException("This server is not live
anymore: activation is failed");
+ }
+ if (activeMQServer.getIdentity() != null) {
+
ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+ } else {
+ ActiveMQServerLogger.LOGGER.serverIsLive();
+ }
+ activeMQServer.completeActivation(true);
+ }
+ }
+
+ private LiveNodeLocator createLiveNodeLocator(final
LiveNodeLocator.BackupRegistrationListener registrationListener) {
+ if (expectedNodeID != null) {
+ assert policy.isTryFailback();
+ return new NamedLiveNodeIdLocatorForReplication(expectedNodeID,
registrationListener, policy.getRetryReplicationWait());
+ }
+ return policy.getGroupName() == null ?
+ new AnyLiveNodeLocatorForReplication(registrationListener,
activeMQServer, policy.getRetryReplicationWait()) :
+ new NamedLiveNodeLocatorForReplication(policy.getGroupName(),
registrationListener, policy.getRetryReplicationWait());
+ }
+
+ private DistributedLock replicateAndFailover(final ClusterController
clusterController) throws ActiveMQException, InterruptedException {
+ final RegistrationFailureForwarder registrationFailureForwarder = new
RegistrationFailureForwarder();
+ // node locator isn't stateless and contains a live-list of candidate
nodes to connect too, hence
+ // it MUST be reused for each replicateLive attempt
+ final LiveNodeLocator nodeLocator =
createLiveNodeLocator(registrationFailureForwarder);
+ clusterController.addClusterTopologyListenerForReplication(nodeLocator);
+ try {
+ while (true) {
+ synchronized (this) {
+ if (closed) {
+ return null;
+ }
+ }
+ final ReplicationFailure failure =
replicateLive(clusterController, nodeLocator, registrationFailureForwarder);
+ if (failure == null) {
+
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
+ continue;
+ }
+ if (!activeMQServer.isStarted()) {
+ return null;
+ }
+ LOGGER.debugf("ReplicationFailure = %s", failure);
+ boolean voluntaryFailOver = false;
+ switch (failure) {
+ case VoluntaryFailOver:
+ voluntaryFailOver = true;
+ case NonVoluntaryFailover:
+ final DistributedLock liveLock = tryAcquireLiveLock();
Review comment:
tryAcquireLiveLock, similarly to checkForInSyncReplica, should return
null if the activation sequence is not a match (and can perform a dirty read to
fail fast): `startAsLive` is going to throw an exception if the activation
sequence change before activating, but this would prevent it to restart as an
empty backup and replicate a live again.
This can happen if:
1. primary start and become live
2. backup become an in-sync replica
3. connection glitch between pair
4. primary increase sequence activation
5. backup try acquire live lock for *some* time ie > ZK session expiration
timeout
6. primary serve some clients and die due to OOM/whatever
7. backup succeed acquiring live lock, but sequence has changed, then throws
exception without becoming live
8. primary is restarted and is able to become live again
9. backup CANNOT won't become an in-sync replica because is in a limbo
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 627630)
Time Spent: 4.5h (was: 4h 20m)
> Replicated Journal quorum-based logical timestamp/version
> ---------------------------------------------------------
>
> Key: ARTEMIS-3340
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3340
> Project: ActiveMQ Artemis
> Issue Type: Improvement
> Reporter: Francesco Nigro
> Assignee: Gary Tully
> Priority: Major
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> Shared-nothing replication can cause journal misalignment despite no
> split-brain events.
> There are several ways that can cause this to happen.
> Below some scenario that won't involve network partitions/drastic outages.
> Scenario 1:
> # Master/Primary start as live, clients connect to it
> # Backup become an in-sync replica
> # User stop live and backup failover to it
> # *Backup serve clients alone, modifying its journal*
> # User stop backup
> # User start master/primary: it become live with a journal misaligned to the
> most up-to-date one ie on the stopped backup
> Scenario 2 (involving network glitch):
> # Master/Primary start as live, clients connect to it
> # Backup become an in-sync replica
> # Connection glitch between backup -> live
> # backup start trying to failover (for {{vote-retries * vote-retry-wait}}
> milliseconds)
> # *Live serve clients alone, modifying its journal*
> # User stop live
> # Backup succeed to failover: it become live with a journal misaligned to
> the most up-to-date one ie on the stopped live
> The main cause of this issue is because we allow *a single broker to serve
> clients*, despite configured with HA, generating the journal misalignment.
> The quorum service (classic or pluggable) just take care of mutual exclusive
> presence of broker for the live role (vs a NodeID), without considering live
> role ordering based on the most up-to-date journal.
> A possible solution is to use
> https://issues.apache.org/jira/browse/ARTEMIS-2716 and use a quorum "logical
> timestamp/version" marking the age/ownership changes of the journal in order
> to force live to always have the most up-to-date journal. It means that such
> value has to be locally saved and exchanged during the initial replica sync,
> involving both journal data and core message protocol changes (just for the
> replication channel, without impacting real clients).
> In case of quorum service restart/outage, admin must use
> command/configuration to let a broker to ignore the age of its journal and
> just force it to start.
> In addition new journal CLI commands should be implemented to inspect the age
> of a (local) broker journal or query/force the quorum journal version too,
> for troubleshooting reasons.
> It's very important to capture every possible event that cause the journal
> age/ownership to change.
> Now let's take a look at Scenario 2 with journal versioning/timestamp:
> # live broker start because it matches the most up to date journal version,
> increasing it (locally and remotely) when it become fully alive
> # backup found it and trust that, given that's live, it already has the
> most-up-to-date journal for a specific NodeID
> # live broker send its journal files to the backup, along with its local
> journal version
> # backup is now ready to failover in any moment: it store the sent journal
> version on its local storage
> # network glitch happen
> # backup try to become live for vote-retries times
> # live detect replication disconnection and *increment the journal version*
> (both quorum and local one)
> # live serve clients alone, modifying its journal
> # outage/stop cause live to die
> # backup detect that *quorum journal version no longer match its own local
> journal version*, meaning that something has happened in the meantime: it
> stop trying to become live
> The key parts related to journal age/version are:
> * only who's live can change quorum (and local) journal version (with a
> monotonic increment)
> * every ownership change event must cause journal age/version to change eg
> starting as live, loosing its backup, etc etc
> Re the RI implementation using Apache Curator, this could use a separate
> [DistributedAtomicLong|https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/atomic/DistributedAtomicLong.html]
> to manage the journal version.
> Although tempting, it's not a good idea to not just use the data field on
> {{InterProcessSemaphoreV2}}, because:
> * there's no API to query it if no lease is acquired yet (or created)
> * data cannot change while the lock is acquired: it won't allow to increase
> journal age because of replica drop
> Athough tempting, it's not a good idea to just use the last alive broker
> connector identity instead of a journal version, because of the ABA problem
> (see https://en.wikipedia.org/wiki/ABA_problem).
> This versioning mechanism isn't without drawbacks: quorum journal versioning
> requires to store a local copy of the version in order to allow the broker to
> query and compare it with the quorum one on restart; having 2 separate and
> not atomic operations means that there must be a way to reconcile/fix it in
> case of misalignments. As said above, this could be done with admin
> operations.
> Journal versioning change the way roles behave, but they still retain theirs
> key characteristics:
> - backup should try start as live in case it has the most up to date journal
> and there is no other live around: differently, can just rotate journal and
> be available to replicate some live
> - primary try to fail-back to any existing live with the most up to date
> journal or await it to appear, without becoming live if it doesn't have the
> most up-to-date journal
> This would ensure that If both broker are up and running and backup allow a
> primary to failback, the primary eventually become live and backup replicates
> it, preserving the desired broker roles.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)