[
https://issues.apache.org/jira/browse/ARTEMIS-2716?focusedWorklogId=592607&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-592607
]
ASF GitHub Bot logged work on ARTEMIS-2716:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 04/May/21 08:27
Start Date: 04/May/21 08:27
Worklog Time Spent: 10m
Work Description: michaelpearce-gain commented on a change in pull
request #3555:
URL: https://github.com/apache/activemq-artemis/pull/3555#discussion_r625597938
##########
File path:
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.core.server.NodeManager;;
+import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import
org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.jboss.logging.Logger;
+
+import static
org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure;
+
+/**
+ * This activation can be used by a primary while trying to fail-back ie
{@code failback == true} or
+ * by a natural-born backup ie {@code failback == false}.<br>
+ */
+public final class ReplicationBackupActivation extends Activation {
+
+ private static final Logger LOGGER =
Logger.getLogger(ReplicationBackupActivation.class);
+
+ private final ReplicationBackupPolicy policy;
+ private final ActiveMQServerImpl activeMQServer;
+ private final boolean failback;
+ // This field is != null iff this node is a primary during a fail-back ie
acting as a backup in order to become live again.
+ private final String expectedNodeID;
+ @GuardedBy("this")
+ private boolean closed;
+ private final DistributedPrimitiveManager distributedManager;
+ // Used for monitoring purposes
+ private volatile ReplicationObserver replicationObserver;
+ // Used for testing purposes
+ private volatile ReplicationEndpoint replicationEndpoint;
+
+ public ReplicationBackupActivation(final ActiveMQServerImpl activeMQServer,
+ final boolean failback,
+ final DistributedPrimitiveManager
distributedManager,
+ final ReplicationBackupPolicy policy) {
+ this.activeMQServer = activeMQServer;
+ this.failback = failback;
+ if (failback) {
+ final SimpleString serverNodeID = activeMQServer.getNodeID();
+ if (serverNodeID == null || serverNodeID.isEmpty()) {
+ throw new IllegalStateException("A failback activation must be
biased around a specific NodeID");
+ }
+ this.expectedNodeID = serverNodeID.toString();
+ } else {
+ this.expectedNodeID = null;
+ }
+ this.distributedManager = distributedManager;
+ this.policy = policy;
+ this.replicationObserver = null;
+ this.replicationEndpoint = null;
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ }
+ try {
+ try {
+ // best effort to add an additional "witness" node in case of
quorum using distributed consensus
+ distributedManager.start(policy.getQuorumVoteWait(),
TimeUnit.SECONDS);
+ } catch (ExecutionException ignore) {
+ LOGGER.debug("Failed to start distributed primitive manager",
ignore);
+ }
+ // Stop the previous node manager and create a new one with
NodeManager::replicatedBackup == true:
+ // NodeManager::start skip setup lock file with NodeID, until
NodeManager::stopBackup is called.
+ activeMQServer.resetNodeManager();
+ activeMQServer.getNodeManager().stop();
+ // A primary need to preserve NodeID across runs
+
activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(),
failback);
+ activeMQServer.getNodeManager().start();
+ if (!activeMQServer.initialisePart1(false)) {
+ return;
+ }
+ synchronized (this) {
+ if (closed)
+ return;
+ }
+ final ClusterController clusterController =
activeMQServer.getClusterManager().getClusterController();
+ clusterController.awaitConnectionToReplicationCluster();
+ activeMQServer.getBackupManager().start();
+
ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(),
+
activeMQServer.getNodeManager().getNodeId());
+ activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
+ final DistributedLock liveLock =
replicateAndFailover(clusterController);
+ if (liveLock == null) {
+ return;
+ }
+ startAsLive(liveLock);
+ } catch (Exception e) {
+ if ((e instanceof InterruptedException || e instanceof
IllegalStateException) && !activeMQServer.isStarted()) {
+ // do not log these errors if the server is being stopped.
+ return;
+ }
+ ActiveMQServerLogger.LOGGER.initializationError(e);
+ }
+ }
+
+ private void startAsLive(final DistributedLock liveLock) throws Exception {
+ policy.getLivePolicy().setBackupPolicy(policy);
+ activeMQServer.setHAPolicy(policy.getLivePolicy());
+
+ synchronized (activeMQServer) {
+ if (!activeMQServer.isStarted()) {
+ liveLock.close();
+ return;
+ }
+ ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
+ // stopBackup is going to write the NodeID previously set on the
NodeManager,
+ // because activeMQServer.resetNodeManager() has created a
NodeManager with replicatedBackup == true.
+ activeMQServer.getNodeManager().stopBackup();
+ activeMQServer.getStorageManager().start();
+ activeMQServer.getBackupManager().activated();
+ // IMPORTANT:
+ // we're setting this activation JUST because it would allow the
server to use its
+ // getActivationChannelHandler to handle replication
+ final ReplicationPrimaryActivation primaryActivation = new
ReplicationPrimaryActivation(activeMQServer, distributedManager,
policy.getLivePolicy());
+ liveLock.addListener(primaryActivation);
+ activeMQServer.setActivation(primaryActivation);
+ activeMQServer.initialisePart2(false);
+ final boolean stillLive;
+ try {
+ stillLive = liveLock.isHeldByCaller();
+ } catch (UnavailableStateException e) {
+ LOGGER.warn(e);
+ throw new ActiveMQIllegalStateException("This server cannot check
its role as a live: activation is failed");
+ }
+ if (!stillLive) {
+ throw new ActiveMQIllegalStateException("This server is not live
anymore: activation is failed");
+ }
+ if (activeMQServer.getIdentity() != null) {
+
ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+ } else {
+ ActiveMQServerLogger.LOGGER.serverIsLive();
+ }
+ activeMQServer.completeActivation(true);
+ }
+ }
+
+ private LiveNodeLocator createLiveNodeLocator(final
LiveNodeLocator.BackupRegistrationListener registrationListener) {
+ if (expectedNodeID != null) {
+ assert failback;
+ return new NamedLiveNodeIdLocatorForReplication(expectedNodeID,
registrationListener, policy.getRetryReplicationWait());
+ }
+ return policy.getGroupName() == null ?
+ new AnyLiveNodeLocatorForReplication(registrationListener,
activeMQServer, policy.getRetryReplicationWait()) :
+ new NamedLiveNodeLocatorForReplication(policy.getGroupName(),
registrationListener, policy.getRetryReplicationWait());
+ }
+
+ private DistributedLock replicateAndFailover(final ClusterController
clusterController) throws ActiveMQException, InterruptedException {
+ while (true) {
+ synchronized (this) {
+ if (closed) {
+ return null;
+ }
+ }
+ final ReplicationFailure failure = replicateLive(clusterController);
+ if (failure == null) {
+
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
+ continue;
+ }
+ if (!activeMQServer.isStarted()) {
+ return null;
+ }
+ LOGGER.debugf("ReplicationFailure = %s", failure);
+ switch (failure) {
+ case VoluntaryFailOver:
+ case NonVoluntaryFailover:
+ final DistributedLock liveLock = tryAcquireLiveLock();
+ if (liveLock != null) {
+ return liveLock;
+ }
+ if (!failback) {
+
ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
+ }
+ asyncRestartServer(activeMQServer, true);
+ return null;
+ case RegistrationError:
+ asyncRestartServer(activeMQServer, false);
+ return null;
+ case AlreadyReplicating:
+ // can just retry here, data should be clean
+ continue;
+ case ClosedObserver:
+ return null;
+ case BackupNotInSync:
+ asyncRestartServer(activeMQServer, true);
+ return null;
+ case WrongNodeId:
+ asyncRestartServer(activeMQServer, true);
+ return null;
+ default:
+ throw new AssertionError("Unsupported failure " + failure);
+ }
+ }
+ }
+
+ private DistributedLock tryAcquireLiveLock() throws InterruptedException {
+ assert activeMQServer.getNodeManager().getNodeId() != null;
+ final String liveID =
activeMQServer.getNodeManager().getNodeId().toString();
+ final int voteRetries = policy.getVoteRetries();
+ final long maxAttempts = voteRetries >= 0 ? (voteRetries + 1) : -1;
Review comment:
in regards to replication, i would typically expect.
-1 normally means wait until all members have acked.
0 means do not wait for any replica acks.
1 would wait till 1 member acked.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 592607)
Time Spent: 2.5h (was: 2h 20m)
> Implements pluggable Quorum Vote
> --------------------------------
>
> Key: ARTEMIS-2716
> URL: https://issues.apache.org/jira/browse/ARTEMIS-2716
> Project: ActiveMQ Artemis
> Issue Type: New Feature
> Reporter: Francesco Nigro
> Assignee: Francesco Nigro
> Priority: Major
> Attachments: backup.png, primary.png
>
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> This task aim to ideliver a new Quorum Vote mechanism for artemis with the
> objectives:
> # to make it pluggable
> # to cleanly separate the election phase and the cluster member states
> # to simplify most common setups in both amount of configuration and
> requirements (eg "witness" nodes could be implemented to support single
> master-slave pairs)
> Post-actions to help people adopt it, but need to be thought upfront:
> # a clean upgrade path for current HA replication users
> # deprecate or integrate the current HA replication into the new version
--
This message was sent by Atlassian Jira
(v8.3.4#803005)