This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e138d  [NO ISSUE][REP] Sync replicas for different partitions 
concurrently
16e138d is described below

commit 16e138d15dbe70c644980cc890db7c724d127b40
Author: Ali Alsuliman <[email protected]>
AuthorDate: Wed Sep 15 03:57:28 2021 +0300

    [NO ISSUE][REP] Sync replicas for different partitions concurrently
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Get sync lock per partition.
    
    Change-Id: I6693fbca51d8e13f9740941f797f63fae7b1d2d0
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13244
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../org/apache/asterix/app/nc/ReplicaManager.java  | 30 +++++++++++++---------
 .../message/RegistrationTasksResponseMessage.java  |  4 ++-
 .../asterix/common/storage/IReplicaManager.java    |  6 +++--
 .../asterix/replication/api/PartitionReplica.java  | 13 +++++++++-
 .../replication/sync/ReplicaSynchronizer.java      |  3 ++-
 .../service/recovery/CheckpointManager.java        | 14 ++++++----
 6 files changed, 48 insertions(+), 22 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index f6de92d..7c4b59c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -54,17 +54,18 @@ public class ReplicaManager implements IReplicaManager {
     /**
      * the partitions to which the current node is master
      */
-    private final Set<Integer> partitions = new HashSet<>();
+    private final Map<Integer, Object> partitions = new HashMap<>();
     /**
      * current replicas
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new 
HashMap<>();
-    private final Object replicaSyncLock = new Object();
     private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
 
     public ReplicaManager(INcApplicationContext appCtx, Set<Integer> 
partitions) {
         this.appCtx = appCtx;
-        this.partitions.addAll(partitions);
+        for (Integer partition : partitions) {
+            this.partitions.put(partition, new Object());
+        }
         setNodeOwnedPartitions(appCtx);
     }
 
@@ -77,7 +78,7 @@ public class ReplicaManager implements IReplicaManager {
             LOGGER.warn("Ignoring request to add replica. Node is not ACTIVE 
yet. Current status: {}", nodeStatus);
             return;
         }
-        if (!partitions.contains(id.getPartition())) {
+        if (!partitions.containsKey(id.getPartition())) {
             throw new IllegalStateException(
                     "This node is not the current master of partition(" + 
id.getPartition() + ")");
         }
@@ -96,7 +97,6 @@ public class ReplicaManager implements IReplicaManager {
         }
         PartitionReplica replica = replicas.remove(id);
         appCtx.getReplicationManager().unregister(replica);
-
     }
 
     @Override
@@ -112,18 +112,20 @@ public class ReplicaManager implements IReplicaManager {
 
     @Override
     public synchronized Set<Integer> getPartitions() {
-        return Collections.unmodifiableSet(partitions);
+        return Collections.unmodifiableSet(partitions.keySet());
     }
 
     @Override
     public synchronized void setActivePartitions(Set<Integer> 
activePartitions) {
         partitions.clear();
-        partitions.addAll(activePartitions);
+        for (Integer partition : activePartitions) {
+            partitions.put(partition, new Object());
+        }
     }
 
     @Override
     public synchronized void promote(int partition) throws 
HyracksDataException {
-        if (partitions.contains(partition)) {
+        if (partitions.containsKey(partition)) {
             return;
         }
         LOGGER.warn("promoting partition {}", partition);
@@ -132,12 +134,12 @@ public class ReplicaManager implements IReplicaManager {
         localResourceRepository.cleanup(partition);
         final IRecoveryManager recoveryManager = 
appCtx.getTransactionSubsystem().getRecoveryManager();
         
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()),
 true);
-        partitions.add(partition);
+        partitions.put(partition, new Object());
     }
 
     @Override
     public synchronized void release(int partition) throws 
HyracksDataException {
-        if (!partitions.contains(partition)) {
+        if (!partitions.containsKey(partition)) {
             return;
         }
         closePartitionResources(partition);
@@ -149,8 +151,12 @@ public class ReplicaManager implements IReplicaManager {
     }
 
     @Override
-    public Object getReplicaSyncLock() {
-        return replicaSyncLock;
+    public synchronized Object getPartitionSyncLock(int partition) {
+        Object syncLock = partitions.get(partition);
+        if (syncLock == null) {
+            throw new IllegalStateException("partition " + partition + " is 
not active on this node");
+        }
+        return syncLock;
     }
 
     @Override
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index b3546cc..f0a4a7c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.app.replication.message;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -73,7 +74,8 @@ public class RegistrationTasksResponseMessage extends 
CcIdentifiedMessage
             }
             NcLocalCounters localCounter = success ? 
NcLocalCounters.collect(getCcId(),
                     (NodeControllerService) 
appCtx.getServiceContext().getControllerService()) : null;
-            Set<Integer> nodeActivePartitions = 
appCtx.getReplicaManager().getPartitions();
+            // wrap the returned partitions in a hash set to make it 
serializable
+            Set<Integer> nodeActivePartitions = new 
HashSet<>(appCtx.getReplicaManager().getPartitions());
             NCLifecycleTaskReportMessage result =
                     new NCLifecycleTaskReportMessage(nodeId, success, 
localCounter, nodeActivePartitions);
             result.setException(exception);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 88d3113..a4d56ce 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -80,12 +80,14 @@ public interface IReplicaManager {
     void release(int partition) throws HyracksDataException;
 
     /**
-     * A lock that can be used to ensure a single replica is being 
synchronized at a time
+     * A lock that can be used to ensure a single partition replica is being 
synchronized at a time
      * by this {@link IReplicaManager}
      *
+     * @param partition partition
+     *
      * @return the synchronization lock
      */
-    Object getReplicaSyncLock();
+    Object getPartitionSyncLock(int partition);
 
     /**
      * Gets the partition replicas matching {@code id}
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
index 3b10700..c49bb7b 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/api/PartitionReplica.java
@@ -24,6 +24,8 @@ import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionR
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
@@ -57,6 +59,7 @@ public class PartitionReplica implements IPartitionReplica {
     private ByteBuffer reusbaleBuf;
     private PartitionReplicaStatus status = DISCONNECTED;
     private ISocketChannel sc;
+    private Future<?> syncFuture;
 
     public PartitionReplica(ReplicaIdentifier id, INcApplicationContext 
appCtx) {
         this.id = id;
@@ -87,7 +90,8 @@ public class PartitionReplica implements IPartitionReplica {
             return;
         }
         setStatus(CATCHING_UP);
-        appCtx.getThreadExecutor().execute(() -> {
+        ExecutorService threadExecutor = (ExecutorService) 
appCtx.getThreadExecutor();
+        syncFuture = threadExecutor.submit(() -> {
             try {
                 new ReplicaSynchronizer(appCtx, this).sync(register, 
deltaRecovery);
                 setStatus(IN_SYNC);
@@ -100,6 +104,13 @@ public class PartitionReplica implements IPartitionReplica 
{
         });
     }
 
+    public synchronized void abort() {
+        if (syncFuture != null) {
+            syncFuture.cancel(true);
+        }
+        syncFuture = null;
+    }
+
     public synchronized ISocketChannel getChannel() {
         try {
             if (!NetworkingUtil.isHealthy(sc)) {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 05e2e75..2434686 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -44,7 +44,8 @@ public class ReplicaSynchronizer {
     }
 
     public void sync(boolean register, boolean deltaRecovery) throws 
IOException {
-        synchronized (appCtx.getReplicaManager().getReplicaSyncLock()) {
+        Object partitionLock = 
appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
+        synchronized (partitionLock) {
             final ICheckpointManager checkpointManager = 
appCtx.getTransactionSubsystem().getCheckpointManager();
             try {
                 // suspend checkpointing datasets to prevent async IO 
operations while sync'ing replicas
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
index 6582670..f09248f 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java
@@ -46,7 +46,7 @@ public class CheckpointManager extends 
AbstractCheckpointManager {
     private static final long NO_SECURED_LSN = -1L;
     private final long datasetCheckpointIntervalNanos;
     private final Map<TxnId, Long> securedLSNs;
-    private boolean suspended = false;
+    private int suspendCount = 0;
 
     public CheckpointManager(ITransactionSubsystem txnSubsystem, 
CheckpointProperties checkpointProperties) {
         super(txnSubsystem, checkpointProperties);
@@ -84,7 +84,7 @@ public class CheckpointManager extends 
AbstractCheckpointManager {
         }
         final long minFirstLSN = 
txnSubsystem.getRecoveryManager().getMinFirstLSN();
         boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
-        if (!checkpointSucceeded && !suspended) {
+        if (!checkpointSucceeded && !isSuspended()) {
             // Flush datasets with indexes behind target checkpoint LSN
             final IDatasetLifecycleManager dlcm = 
txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
             
dlcm.asyncFlushMatchingIndexes(newLaggingDatasetPredicate(checkpointTargetLSN));
@@ -109,21 +109,25 @@ public class CheckpointManager extends 
AbstractCheckpointManager {
 
     @Override
     public synchronized void checkpointIdleDatasets() throws 
HyracksDataException {
-        if (suspended) {
+        if (isSuspended()) {
             return;
         }
         final IDatasetLifecycleManager dlcm = 
txnSubsystem.getApplicationContext().getDatasetLifecycleManager();
         dlcm.asyncFlushMatchingIndexes(newIdleDatasetPredicate());
     }
 
+    private synchronized boolean isSuspended() {
+        return suspendCount != 0;
+    }
+
     @Override
     public synchronized void suspend() {
-        suspended = true;
+        suspendCount++;
     }
 
     @Override
     public synchronized void resume() {
-        suspended = false;
+        suspendCount--;
     }
 
     private synchronized long getMinSecuredLSN() {

Reply via email to