[ 
https://issues.apache.org/jira/browse/ARTEMIS-2716?focusedWorklogId=592608&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-592608
 ]

ASF GitHub Bot logged work on ARTEMIS-2716:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/May/21 08:27
            Start Date: 04/May/21 08:27
    Worklog Time Spent: 10m 
      Work Description: michaelpearce-gain commented on a change in pull 
request #3555:
URL: https://github.com/apache/activemq-artemis/pull/3555#discussion_r625596297



##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationBackupActivation.java
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.replication.ReplicationEndpoint;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.LiveNodeLocator;
+import org.apache.activemq.artemis.core.server.NodeManager;;
+import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
+import org.apache.activemq.artemis.core.server.cluster.ClusterController;
+import 
org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.jboss.logging.Logger;
+
+import static 
org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure;
+
+/**
+ * This activation can be used by a primary while trying to fail-back ie 
{@code failback == true} or
+ * by a natural-born backup ie {@code failback == false}.<br>
+ */
+public final class ReplicationBackupActivation extends Activation {
+
+   private static final Logger LOGGER = 
Logger.getLogger(ReplicationBackupActivation.class);
+
+   private final ReplicationBackupPolicy policy;
+   private final ActiveMQServerImpl activeMQServer;
+   private final boolean failback;
+   // This field is != null iff this node is a primary during a fail-back ie 
acting as a backup in order to become live again.
+   private final String expectedNodeID;
+   @GuardedBy("this")
+   private boolean closed;
+   private final DistributedPrimitiveManager distributedManager;
+   // Used for monitoring purposes
+   private volatile ReplicationObserver replicationObserver;
+   // Used for testing purposes
+   private volatile ReplicationEndpoint replicationEndpoint;
+
+   public ReplicationBackupActivation(final ActiveMQServerImpl activeMQServer,
+                                      final boolean failback,
+                                      final DistributedPrimitiveManager 
distributedManager,
+                                      final ReplicationBackupPolicy policy) {
+      this.activeMQServer = activeMQServer;
+      this.failback = failback;
+      if (failback) {
+         final SimpleString serverNodeID = activeMQServer.getNodeID();
+         if (serverNodeID == null || serverNodeID.isEmpty()) {
+            throw new IllegalStateException("A failback activation must be 
biased around a specific NodeID");
+         }
+         this.expectedNodeID = serverNodeID.toString();
+      } else {
+         this.expectedNodeID = null;
+      }
+      this.distributedManager = distributedManager;
+      this.policy = policy;
+      this.replicationObserver = null;
+      this.replicationEndpoint = null;
+   }
+
+   @Override
+   public void run() {
+      synchronized (this) {
+         if (closed) {
+            return;
+         }
+      }
+      try {
+         try {
+            // best effort to add an additional "witness" node in case of 
quorum using distributed consensus
+            distributedManager.start(policy.getQuorumVoteWait(), 
TimeUnit.SECONDS);
+         } catch (ExecutionException ignore) {
+            LOGGER.debug("Failed to start distributed primitive manager", 
ignore);
+         }
+         // Stop the previous node manager and create a new one with 
NodeManager::replicatedBackup == true:
+         // NodeManager::start skip setup lock file with NodeID, until 
NodeManager::stopBackup is called.
+         activeMQServer.resetNodeManager();
+         activeMQServer.getNodeManager().stop();
+         // A primary need to preserve NodeID across runs
+         
activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(), 
failback);
+         activeMQServer.getNodeManager().start();
+         if (!activeMQServer.initialisePart1(false)) {
+            return;
+         }
+         synchronized (this) {
+            if (closed)
+               return;
+         }
+         final ClusterController clusterController = 
activeMQServer.getClusterManager().getClusterController();
+         clusterController.awaitConnectionToReplicationCluster();
+         activeMQServer.getBackupManager().start();
+         
ActiveMQServerLogger.LOGGER.backupServerStarted(activeMQServer.getVersion().getFullVersion(),
+                                                         
activeMQServer.getNodeManager().getNodeId());
+         activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
+         final DistributedLock liveLock = 
replicateAndFailover(clusterController);
+         if (liveLock == null) {
+            return;
+         }
+         startAsLive(liveLock);
+      } catch (Exception e) {
+         if ((e instanceof InterruptedException || e instanceof 
IllegalStateException) && !activeMQServer.isStarted()) {
+            // do not log these errors if the server is being stopped.
+            return;
+         }
+         ActiveMQServerLogger.LOGGER.initializationError(e);
+      }
+   }
+
+   private void startAsLive(final DistributedLock liveLock) throws Exception {
+      policy.getLivePolicy().setBackupPolicy(policy);
+      activeMQServer.setHAPolicy(policy.getLivePolicy());
+
+      synchronized (activeMQServer) {
+         if (!activeMQServer.isStarted()) {
+            liveLock.close();
+            return;
+         }
+         ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
+         // stopBackup is going to write the NodeID previously set on the 
NodeManager,
+         // because activeMQServer.resetNodeManager() has created a 
NodeManager with replicatedBackup == true.
+         activeMQServer.getNodeManager().stopBackup();
+         activeMQServer.getStorageManager().start();
+         activeMQServer.getBackupManager().activated();
+         // IMPORTANT:
+         // we're setting this activation JUST because it would allow the 
server to use its
+         // getActivationChannelHandler to handle replication
+         final ReplicationPrimaryActivation primaryActivation = new 
ReplicationPrimaryActivation(activeMQServer, distributedManager, 
policy.getLivePolicy());
+         liveLock.addListener(primaryActivation);
+         activeMQServer.setActivation(primaryActivation);
+         activeMQServer.initialisePart2(false);
+         final boolean stillLive;
+         try {
+            stillLive = liveLock.isHeldByCaller();
+         } catch (UnavailableStateException e) {
+            LOGGER.warn(e);
+            throw new ActiveMQIllegalStateException("This server cannot check 
its role as a live: activation is failed");
+         }
+         if (!stillLive) {
+            throw new ActiveMQIllegalStateException("This server is not live 
anymore: activation is failed");
+         }
+         if (activeMQServer.getIdentity() != null) {
+            
ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+         } else {
+            ActiveMQServerLogger.LOGGER.serverIsLive();
+         }
+         activeMQServer.completeActivation(true);
+      }
+   }
+
+   private LiveNodeLocator createLiveNodeLocator(final 
LiveNodeLocator.BackupRegistrationListener registrationListener) {
+      if (expectedNodeID != null) {
+         assert failback;
+         return new NamedLiveNodeIdLocatorForReplication(expectedNodeID, 
registrationListener, policy.getRetryReplicationWait());
+      }
+      return policy.getGroupName() == null ?
+         new AnyLiveNodeLocatorForReplication(registrationListener, 
activeMQServer, policy.getRetryReplicationWait()) :
+         new NamedLiveNodeLocatorForReplication(policy.getGroupName(), 
registrationListener, policy.getRetryReplicationWait());
+   }
+
+   private DistributedLock replicateAndFailover(final ClusterController 
clusterController) throws ActiveMQException, InterruptedException {
+      while (true) {
+         synchronized (this) {
+            if (closed) {
+               return null;
+            }
+         }
+         final ReplicationFailure failure = replicateLive(clusterController);
+         if (failure == null) {
+            
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
+            continue;
+         }
+         if (!activeMQServer.isStarted()) {
+            return null;
+         }
+         LOGGER.debugf("ReplicationFailure = %s", failure);
+         switch (failure) {
+            case VoluntaryFailOver:
+            case NonVoluntaryFailover:
+               final DistributedLock liveLock = tryAcquireLiveLock();
+               if (liveLock != null) {
+                  return liveLock;
+               }
+               if (!failback) {
+                  
ActiveMQServerLogger.LOGGER.restartingAsBackupBasedOnQuorumVoteResults();
+               }
+               asyncRestartServer(activeMQServer, true);
+               return null;
+            case RegistrationError:
+               asyncRestartServer(activeMQServer, false);
+               return null;
+            case AlreadyReplicating:
+               // can just retry here, data should be clean
+               continue;
+            case ClosedObserver:
+               return null;
+            case BackupNotInSync:
+               asyncRestartServer(activeMQServer, true);
+               return null;
+            case WrongNodeId:
+               asyncRestartServer(activeMQServer, true);
+               return null;
+            default:
+               throw new AssertionError("Unsupported failure " + failure);
+         }
+      }
+   }
+
+   private DistributedLock tryAcquireLiveLock() throws InterruptedException {
+      assert activeMQServer.getNodeManager().getNodeId() != null;
+      final String liveID = 
activeMQServer.getNodeManager().getNodeId().toString();
+      final int voteRetries = policy.getVoteRetries();
+      final long maxAttempts = voteRetries >= 0 ? (voteRetries + 1) : -1;

Review comment:
       so in retry logic.
   
   -1 normally means to me "unlimited" e.g. never stop, keep re-trying for ever.
   
   0 means dont do any retry, e.g. fail/error on first error, and do not 
re-attempt.
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 592608)
    Time Spent: 2h 40m  (was: 2.5h)

> Implements pluggable Quorum Vote
> --------------------------------
>
>                 Key: ARTEMIS-2716
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-2716
>             Project: ActiveMQ Artemis
>          Issue Type: New Feature
>            Reporter: Francesco Nigro
>            Assignee: Francesco Nigro
>            Priority: Major
>         Attachments: backup.png, primary.png
>
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> This task aim to ideliver a new Quorum Vote mechanism for artemis with the 
> objectives:
> # to make it pluggable
> # to cleanly separate the election phase and the cluster member states
> # to simplify most common setups in both amount of configuration and 
> requirements (eg "witness" nodes could be implemented to support single 
> master-slave pairs)
> Post-actions to help people adopt it, but need to be thought upfront:
> # a clean upgrade path for current HA replication users
> # deprecate or integrate the current HA replication into the new version



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to