[ 
https://issues.apache.org/jira/browse/ARTEMIS-3340?focusedWorklogId=627458&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-627458
 ]

ASF GitHub Bot logged work on ARTEMIS-3340:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 25/Jul/21 15:58
            Start Date: 25/Jul/21 15:58
    Worklog Time Spent: 10m 
      Work Description: franz1981 commented on a change in pull request #3646:
URL: https://github.com/apache/activemq-artemis/pull/3646#discussion_r675981524



##########
File path: 
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ReplicationPrimaryActivation.java
##########
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import 
org.apache.activemq.artemis.api.core.ActiveMQAlreadyReplicatingException;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
+import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Channel;
+import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
+import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupRegistrationMessage;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
+import 
org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationLiveIsStoppingMessage;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.remoting.server.RemotingService;
+import org.apache.activemq.artemis.core.replication.ReplicationManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
+import 
org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
+import org.apache.activemq.artemis.quorum.DistributedLock;
+import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
+import org.apache.activemq.artemis.quorum.MutableLong;
+import org.apache.activemq.artemis.quorum.UnavailableStateException;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.jboss.logging.Logger;
+
+import static 
org.apache.activemq.artemis.core.server.ActiveMQServer.SERVER_STATE.STARTED;
+import static 
org.apache.activemq.artemis.core.server.impl.ClusterTopologySearch.searchActiveLiveNodeId;
+
+/**
+ * This is going to be {@link #run()} just by natural born primary, at the 
first start.
+ * Both during a failover or a failback, {@link #run()} isn't going to be 
used, but only {@link #getActivationChannelHandler(Channel, Acceptor)}.
+ */
+public class ReplicationPrimaryActivation extends LiveActivation implements 
DistributedLock.UnavailableLockListener {
+
+   private static final Logger LOGGER = 
Logger.getLogger(ReplicationPrimaryActivation.class);
+   private static final long DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS = 20_000;
+   private static final long BLOCKING_CALLS_TIMEOUT_MILLIS = 5_000;
+
+   private final ReplicationPrimaryPolicy policy;
+
+   private final ActiveMQServerImpl activeMQServer;
+
+   @GuardedBy("replicationLock")
+   private ReplicationManager replicationManager;
+
+   private final Object replicationLock;
+
+   private final DistributedPrimitiveManager distributedManager;
+
+   private volatile boolean stoppingServer;
+
+   public ReplicationPrimaryActivation(final ActiveMQServerImpl activeMQServer,
+                                       final DistributedPrimitiveManager 
distributedManager,
+                                       final ReplicationPrimaryPolicy policy) {
+      this.activeMQServer = activeMQServer;
+      this.policy = policy;
+      this.replicationLock = new Object();
+      this.distributedManager = distributedManager;
+   }
+
+   /**
+    * used for testing purposes.
+    */
+   public DistributedPrimitiveManager getDistributedManager() {
+      return distributedManager;
+   }
+
+   @Override
+   public void freezeConnections(RemotingService remotingService) {
+      final ReplicationManager replicationManager = getReplicationManager();
+
+      if (remotingService != null && replicationManager != null) {
+         remotingService.freeze(null, 
replicationManager.getBackupTransportConnection());
+      } else if (remotingService != null) {
+         remotingService.freeze(null, null);
+      }
+   }
+
+   @Override
+   public void run() {
+      try {
+
+         // we have a common nodeId that we can share and coordinate with 
between peers
+         if (policy.getCoordinationId() != null) {
+            LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated 
live activation", policy.getCoordinationId());
+
+            // REVISIT: this is quite clunky, also in backup activation, we 
just need new nodeID persisted!
+            activeMQServer.resetNodeManager();
+            activeMQServer.getNodeManager().start();
+            
activeMQServer.getNodeManager().setNodeID(policy.getCoordinationId());
+            activeMQServer.getNodeManager().stopBackup();
+         }
+         final String nodeId = 
activeMQServer.getNodeManager().readNodeId().toString();
+
+         final DistributedLock liveLock = searchLiveOrAcquireLiveLock(nodeId, 
BLOCKING_CALLS_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+         if (liveLock == null) {
+            return;
+         }
+
+         
ReplicationBackupActivation.ensureSequentialAccessToNodeData(activeMQServer, 
distributedManager, LOGGER);
+
+         activeMQServer.initialisePart1(false);
+
+         activeMQServer.initialisePart2(false);
+
+         // must be registered before checking the caller
+         liveLock.addListener(this);
+
+         // This control is placed here because initialisePart2 is going to 
load the journal that
+         // could pause the JVM for enough time to lose lock ownership
+         if (!liveLock.isHeldByCaller()) {
+            throw new IllegalStateException("This broker isn't live anymore, 
probably due to application pauses eg GC, OS etc: failing now");
+         }
+
+         activeMQServer.completeActivation(true);
+
+         if (activeMQServer.getIdentity() != null) {
+            
ActiveMQServerLogger.LOGGER.serverIsLive(activeMQServer.getIdentity());
+         } else {
+            ActiveMQServerLogger.LOGGER.serverIsLive();
+         }
+      } catch (Exception e) {
+         // async stop it, we don't need to await this to complete
+         distributedManager.stop();
+         ActiveMQServerLogger.LOGGER.initializationError(e);
+         activeMQServer.callActivationFailureListeners(e);
+      }
+   }
+
+   private DistributedLock searchLiveOrAcquireLiveLock(final String nodeId,
+                                                       final long 
blockingCallTimeout,
+                                                       final TimeUnit unit) 
throws ActiveMQException, InterruptedException {
+      if (policy.isCheckForLiveServer()) {
+         LOGGER.infof("Searching for a live server matching NodeID = %s", 
nodeId);
+         if (searchActiveLiveNodeId(policy.getClusterName(), nodeId, 
blockingCallTimeout, unit, activeMQServer.getConfiguration())) {
+            LOGGER.infof("Found a live server with  NodeID = %s: restarting as 
backup", nodeId);
+            activeMQServer.setHAPolicy(policy.getBackupPolicy());
+            return null;
+         }
+      }
+      startDistributedPrimitiveManager();
+      return acquireDistributeLock(getDistributeLock(nodeId), 
blockingCallTimeout, unit);
+   }
+
+   private void startDistributedPrimitiveManager() throws 
InterruptedException, ActiveMQException {
+      LOGGER.infof("Trying to reach the majority of quorum nodes in %d ms.", 
DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS);
+      try {
+         if 
(distributedManager.start(DISTRIBUTED_MANAGER_START_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS)) {
+            return;
+         }
+      } catch (InterruptedException ie) {
+         throw ie;
+      } catch (Throwable t) {
+         LOGGER.debug(t);
+      }
+      assert !distributedManager.isStarted();
+      throw new ActiveMQException("Cannot reach the majority of quorum nodes");
+   }
+
+   private DistributedLock getDistributeLock(final String nodeId) throws 
InterruptedException, ActiveMQException {
+      try {
+         return distributedManager.getDistributedLock(nodeId);
+      } catch (Throwable t) {
+         try {
+            distributedManager.stop();
+         } catch (Throwable ignore) {
+            // don't care
+         }
+         if (t instanceof InterruptedException) {
+            throw (InterruptedException) t;
+         }
+         throw new ActiveMQException("Cannot obtain a live lock instance");
+      }
+   }
+
+   private DistributedLock acquireDistributeLock(final DistributedLock 
liveLock,
+                                                 final long acquireLockTimeout,
+                                                 final TimeUnit unit) throws 
InterruptedException, ActiveMQException {
+      try {
+         if (liveLock.tryLock(acquireLockTimeout, unit)) {
+            return liveLock;
+         }
+      } catch (UnavailableStateException e) {
+         LOGGER.debug(e);
+      }
+      try {
+         distributedManager.stop();
+      } catch (Throwable ignore) {
+         // don't care
+      }
+      throw new ActiveMQException("Failed to become live");
+   }
+
+   @Override
+   public ChannelHandler getActivationChannelHandler(final Channel channel, 
final Acceptor acceptorUsed) {
+      if (stoppingServer) {
+         return null;
+      }
+      return packet -> {
+         if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) {
+            onBackupRegistration(channel, acceptorUsed, 
(BackupRegistrationMessage) packet);
+         }
+      };
+   }
+
+   private void onBackupRegistration(final Channel channel,
+                                     final Acceptor acceptorUsed,
+                                     final BackupRegistrationMessage msg) {
+      try {
+         startAsyncReplication(channel.getConnection(), 
acceptorUsed.getClusterConnection(), msg.getConnector(), 
msg.isFailBackRequest());
+      } catch (ActiveMQAlreadyReplicatingException are) {
+         channel.send(new 
BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.ALREADY_REPLICATING));
+      } catch (ActiveMQException e) {
+         LOGGER.debug("Failed to process backup registration packet", e);
+         channel.send(new 
BackupReplicationStartFailedMessage(BackupReplicationStartFailedMessage.BackupRegistrationProblem.EXCEPTION));
+      }
+   }
+
+   private void startAsyncReplication(final CoreRemotingConnection 
remotingConnection,
+                                      final ClusterConnection 
clusterConnection,
+                                      final TransportConfiguration 
backupTransport,
+                                      final boolean isFailBackRequest) throws 
ActiveMQException {
+      synchronized (replicationLock) {
+         if (replicationManager != null) {
+            throw new ActiveMQAlreadyReplicatingException();
+         }
+         if (!activeMQServer.isStarted()) {
+            throw new ActiveMQIllegalStateException();
+         }
+         final ReplicationFailureListener listener = new 
ReplicationFailureListener();
+         remotingConnection.addCloseListener(listener);
+         remotingConnection.addFailureListener(listener);
+         final ReplicationManager replicationManager = new 
ReplicationManager(activeMQServer, remotingConnection, 
clusterConnection.getCallTimeout(), policy.getInitialReplicationSyncTimeout(), 
activeMQServer.getIOExecutorFactory());
+         this.replicationManager = replicationManager;
+         replicationManager.start();
+         final Thread replicatingThread = new Thread(() -> 
replicate(replicationManager, clusterConnection, isFailBackRequest, 
backupTransport));
+         replicatingThread.setName("async-replication-thread");
+         replicatingThread.start();
+      }
+   }
+
+   private void replicate(final ReplicationManager replicationManager,
+                          final ClusterConnection clusterConnection,
+                          final boolean isFailBackRequest,
+                          final TransportConfiguration backupTransport) {
+      try {
+         final String nodeID = activeMQServer.getNodeID().toString();
+         
activeMQServer.getStorageManager().startReplication(replicationManager, 
activeMQServer.getPagingManager(), nodeID, isFailBackRequest && 
policy.isAllowAutoFailBack(), policy.getInitialReplicationSyncTimeout());
+
+         clusterConnection.nodeAnnounced(System.currentTimeMillis(), nodeID, 
policy.getGroupName(), policy.getScaleDownGroupName(), new Pair<>(null, 
backupTransport), true);
+
+         if (isFailBackRequest && policy.isAllowAutoFailBack()) {
+            awaitBackupAnnouncementOnFailbackRequest(clusterConnection);
+         }
+      } catch (Exception e) {
+         if (activeMQServer.getState() == STARTED) {
+            /*
+             * The reasoning here is that the exception was either caused by 
(1) the
+             * (interaction with) the backup, or (2) by an IO Error at the 
storage. If (1), we
+             * can swallow the exception and ignore the replication request. 
If (2) the live
+             * will crash shortly.
+             */
+            ActiveMQServerLogger.LOGGER.errorStartingReplication(e);
+         }
+         try {
+            ActiveMQServerImpl.stopComponent(replicationManager);
+         } catch (Exception amqe) {
+            ActiveMQServerLogger.LOGGER.errorStoppingReplication(amqe);
+         } finally {
+            synchronized (replicationLock) {
+               this.replicationManager = null;
+            }
+         }
+      }
+   }
+
+   /**
+    * This is handling awaiting backup announcement before trying to failover.
+    * This broker is a backup broker, acting as a live and ready to restart as 
a backup
+    */
+   private void awaitBackupAnnouncementOnFailbackRequest(ClusterConnection 
clusterConnection) throws Exception {
+      final String nodeID = activeMQServer.getNodeID().toString();
+      final BackupTopologyListener topologyListener = new 
BackupTopologyListener(nodeID, clusterConnection.getConnector());
+      clusterConnection.addClusterTopologyListener(topologyListener);
+      try {
+         if (topologyListener.waitForBackup()) {
+            restartAsBackupAfterFailback();
+         } else {
+            ActiveMQServerLogger.LOGGER.failbackMissedBackupAnnouncement();
+         }
+      } finally {
+         clusterConnection.removeClusterTopologyListener(topologyListener);
+      }
+   }
+
+   /**
+    * If {@link #asyncStopServer()} happens before this call, the restart just 
won't happen.
+    * If {@link #asyncStopServer()} happens after this call, will make the 
server to stop right after being restarted.
+    */
+   private void restartAsBackupAfterFailback() throws Exception {
+      if (stoppingServer) {
+         return;
+      }
+      synchronized (this) {
+         if (stoppingServer) {
+            return;
+         }
+         final String coordinatedLockAndNodeId = 
activeMQServer.getNodeManager().getNodeId().toString();
+         final long inSyncReplicaActivation = 
activeMQServer.getNodeManager().getNodeActivationSequence();
+         DistributedLock existingLiveLock = 
distributedManager.getDistributedLock(coordinatedLockAndNodeId);
+         // give up our role as 'live' asap, will also happen on server.fail
+         existingLiveLock.close();
+
+         activeMQServer.fail(true);
+
+         // need to restart
+         distributedManager.start();
+         final MutableLong coordinatedActivationSequence = 
distributedManager.getMutableLong(coordinatedLockAndNodeId);
+         // wait for the live to activate and run un replicated with a 
sequence > inSyncReplicaActivation
+         // this read can be dirty b/c we are just looking for an increment, 
we don't care about the actual value
+         final long done = System.currentTimeMillis() + 
policy.getBackupPolicy().getVoteRetryWait();
+         do {
+            final long coordinatedValue = coordinatedActivationSequence.get();
+            if (coordinatedValue > inSyncReplicaActivation) {
+               // all good, activation has gone ahead
+               LOGGER.infof("Detected expected sequential server activation 
after failback, with NodeID = %s: and sequence: %d", coordinatedLockAndNodeId, 
coordinatedValue);
+               break;
+            }
+            try {
+               TimeUnit.MILLISECONDS.sleep(100);
+            } catch (InterruptedException ignored) {
+            }
+         }
+         while (done < System.currentTimeMillis());
+
+         if (coordinatedActivationSequence.get() == inSyncReplicaActivation) {
+            LOGGER.warnf("Timed out waiting for failback server activation 
with NodeID = %s: and sequence > %d: after %dms",

Review comment:
       what happen if the other primary hasn't been able to failback?
   
   In the worst case scenario the replicated candidate live die, but this 
broker, skipping the activation check, will rotate the most up to date data 
just received from it, waiting it to appear, maybe forever (because the died 
live is the only one with the right/same activation sequence).
    
   IMO it makes sense to never skip checking the activation sequence on server 
restart during a failback, in short...
   backup activation is
   ```java
            if (policy.isTryFailback()) {
               // we are replicating to overwrite our data, transient backup 
state while trying to be the primary
            } else {
               // we may be a valid insync_replica (backup) if our activation 
sequence is largest for a nodeId
               // verify that before removing data..
               final DistributedLock liveLockWithInSyncReplica = 
checkForInSyncReplica();
   ```
   and it can just become
   ```java
               // we may be a valid insync_replica (backup) if our activation 
sequence is largest for a nodeId
               // verify that before removing data..
               final DistributedLock liveLockWithInSyncReplica = 
checkForInSyncReplica();
   ```
   because `checkForInSyncReplica` won't give up on becoming live again if the 
data is the most up to date.
   wdyt?
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 627458)
    Time Spent: 2h 10m  (was: 2h)

> Replicated Journal quorum-based logical timestamp/version
> ---------------------------------------------------------
>
>                 Key: ARTEMIS-3340
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3340
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>            Reporter: Francesco Nigro
>            Assignee: Gary Tully
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> Shared-nothing replication can cause journal misalignment despite no 
> split-brain events.
> There are several ways that can cause this to happen.
> Below some scenario that won't involve network partitions/drastic outages.
> Scenario 1:
>  # Master/Primary start as live, clients connect to it
>  # Backup become an in-sync replica
>  # User stop live and backup failover to it
>  # *Backup serve clients alone, modifying its journal*
>  # User stop backup
>  # User start master/primary: it become live with a journal misaligned to the 
> most up-to-date one ie on the stopped backup
> Scenario 2 (involving network glitch):
>  # Master/Primary start as live, clients connect to it
>  # Backup become an in-sync replica
>  # Connection glitch between backup -> live
>  # backup start trying to failover (for {{vote-retries * vote-retry-wait}} 
> milliseconds)
>  # *Live serve clients alone, modifying its journal*
>  # User stop live
>  # Backup succeed to failover: it become live with a journal misaligned to 
> the most up-to-date one ie on the stopped live
> The main cause of this issue is because we allow *a single broker to serve 
> clients*, despite configured with HA, generating the journal misalignment.
>  The quorum service (classic or pluggable) just take care of mutual exclusive 
> presence of broker for the live role (vs a NodeID), without considering live 
> role ordering based on the most up-to-date journal.
> A possible solution is to use 
> https://issues.apache.org/jira/browse/ARTEMIS-2716 and use a quorum "logical 
> timestamp/version" marking the age/ownership changes of the journal in order 
> to force live to always have the most up-to-date journal. It means that such 
> value has to be locally saved and exchanged during the initial replica sync, 
> involving both journal data and core message protocol changes (just for the 
> replication channel, without impacting real clients).
> In case of quorum service restart/outage, admin must use 
> command/configuration to let a broker to ignore the age of its journal and 
> just force it to start.
> In addition new journal CLI commands should be implemented to inspect the age 
> of a (local) broker journal or query/force the quorum journal version too, 
> for troubleshooting reasons.
> It's very important to capture every possible event that cause the journal 
> age/ownership to change.
> Now let's take a look at Scenario 2 with journal versioning/timestamp:
>  # live broker start because it matches the most up to date journal version, 
> increasing it (locally and remotely) when it become fully alive
>  # backup found it and trust that, given that's live, it already has the 
> most-up-to-date journal for a specific NodeID 
>  # live broker send its journal files to the backup, along with its local 
> journal version
>  # backup is now ready to failover in any moment: it store the sent journal 
> version on its local storage
>  # network glitch happen
>  # backup try to become live for vote-retries times
>  # live detect replication disconnection and *increment the journal version* 
> (both quorum and local one)
>  # live serve clients alone, modifying its journal
>  # outage/stop cause live to die
>  # backup detect that *quorum journal version no longer match its own local 
> journal version*, meaning that something has happened in the meantime: it 
> stop trying to become live
> The key parts related to journal age/version are:
>  * only who's live can change quorum (and local) journal version (with a 
> monotonic increment)
>  * every ownership change event must cause journal age/version to change eg 
> starting as live, loosing its backup, etc etc
> Re the RI implementation using Apache Curator, this could use a separate 
> [DistributedAtomicLong|https://curator.apache.org/apidocs/org/apache/curator/framework/recipes/atomic/DistributedAtomicLong.html]
>   to manage the journal version.
> Although tempting, it's not a good idea to not just use the data field on 
> {{InterProcessSemaphoreV2}}, because:
> * there's no API to query it if no lease is acquired yet (or created)
> * data cannot change while the lock is acquired: it won't allow to increase 
> journal age because of replica drop
> Athough tempting, it's not a good idea to just use the last alive broker 
> connector identity instead of a journal version, because of the ABA problem 
> (see https://en.wikipedia.org/wiki/ABA_problem).
> This versioning mechanism isn't without drawbacks: quorum journal versioning 
> requires to store a local copy of the version in order to allow the broker to 
> query and compare it with the quorum one on restart; having 2 separate and 
> not atomic operations means that there must be a way to reconcile/fix it in 
> case of misalignments. As said above, this could be done with admin 
> operations.
> Journal versioning change the way roles behave, but they still retain theirs 
> key characteristics:
> - backup should try start as live in case it has the most up to date journal 
> and there is no other live around: differently, can just rotate journal and 
> be available to replicate some live
> - primary try to fail-back to any existing live with the most up to date 
> journal or await it to appear, without becoming live if it doesn't have the 
> most up-to-date journal
> This would ensure that If both broker are up and running and backup allow a 
> primary to failback, the primary eventually become live and backup replicates 
> it, preserving the desired broker roles.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to