This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit d86e2327d1c961c1bf6c666f6947fdab1560bc72
Author: Michael Blow <[email protected]>
AuthorDate: Sun Oct 19 20:43:09 2025 -0400

    [NO ISSUE][*DB][STO] Cloud, logging misc improvements
    
    Ext-ref: MB-69042,MB-68946
    Change-Id: I6b10c08e74e664f1b59ed0df3bf16b871360d66f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20504
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Michael Blow <[email protected]>
---
 .../asterix/api/http/server/StorageApiServlet.java |  11 +-
 .../app/message/StorageCleanupRequestMessage.java  |   4 +-
 .../apache/asterix/app/nc/NCAppRuntimeContext.java |   4 +-
 .../org/apache/asterix/app/nc/RecoveryManager.java |  34 +++--
 .../org/apache/asterix/app/nc/ReplicaManager.java  |  37 +++---
 .../nc/task/CloudToLocalStorageCachingTask.java    |  14 +-
 .../asterix/app/nc/task/LocalRecoveryTask.java     |  12 +-
 .../app/nc/task/LocalStorageCleanupTask.java       |   5 +-
 .../asterix/app/nc/task/UpdateNodeStatusTask.java  |   9 +-
 .../app/replication/NcLifecycleCoordinator.java    |  13 +-
 .../message/NCLifecycleTaskReportMessage.java      |  11 +-
 .../message/RegistrationTasksRequestMessage.java   |  12 +-
 .../message/RegistrationTasksResponseMessage.java  |   5 +-
 .../asterix/runtime/ClusterStateManagerTest.java   |   7 +-
 .../asterix/cloud/AbstractCloudIOManager.java      |   2 +-
 .../asterix/cloud/LocalPartitionBootstrapper.java  |   4 +-
 .../cloud/clients/aws/s3/S3ClientConfig.java       |  18 +--
 .../cloud/clients/aws/s3/S3ParallelDownloader.java |   2 +-
 .../org/apache/asterix/cloud/s3/LSMS3Test.java     |   2 +-
 .../common/cloud/IPartitionBootstrapper.java       |   4 +-
 .../common/cluster/IClusterStateManager.java       |   3 +-
 .../cluster/StorageComputePartitionsMap.java       |   7 +-
 .../asterix/common/config/CloudProperties.java     |  75 ++++++-----
 .../asterix/common/config/MetadataProperties.java  |   4 +-
 .../asterix/common/config/PropertiesAccessor.java  |   7 +-
 .../asterix/common/storage/IReplicaManager.java    |  10 +-
 .../common/transactions/IRecoveryManager.java      |   8 +-
 .../apache/asterix/common/utils/Partitions.java    | 146 +++++++++++++++++----
 .../replication/logging/RemoteLogsNotifier.java    |   7 +-
 .../replication/sync/ReplicaSynchronizer.java      |   4 +-
 .../asterix/runtime/utils/ClusterStateManager.java |  12 +-
 .../PersistentLocalResourceRepository.java         |  11 +-
 32 files changed, 305 insertions(+), 199 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index 26fee28e10..b93938120b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -24,16 +24,15 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
+import java.util.function.IntPredicate;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.storage.ResourceStorageStats;
+import org.apache.asterix.common.utils.Partitions;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.http.api.IServletRequest;
@@ -121,11 +120,11 @@ public class StorageApiServlet extends AbstractServlet {
         return getStatus(partition::equals);
     }
 
-    private JsonNode getStatus(Predicate<Integer> predicate) {
+    private JsonNode getStatus(IntPredicate predicate) {
         final ArrayNode status = OBJECT_MAPPER.createArrayNode();
         final IReplicaManager storageSubsystem = appCtx.getReplicaManager();
-        final Set<Integer> partitions =
-                
storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet());
+        final Partitions partitions = 
storageSubsystem.getPartitions().stream().filter(predicate)
+                .collect(Partitions::new, Partitions::add, Partitions::addAll);
         for (Integer partition : partitions) {
             final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode();
             partitionJson.put("partition", partition);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
index beaaac00fd..799510cbef 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java
@@ -21,7 +21,6 @@ package org.apache.asterix.app.message;
 import static 
org.apache.hyracks.util.ExitUtil.EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED;
 
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -30,6 +29,7 @@ import 
org.apache.asterix.common.messaging.CcIdentifiedMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.utils.Partitions;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.common.IIndex;
@@ -57,7 +57,7 @@ public class StorageCleanupRequestMessage extends 
CcIdentifiedMessage implements
         INCMessageBroker broker = (INCMessageBroker) 
appContext.getServiceContext().getMessageBroker();
         PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) 
appContext.getLocalResourceRepository();
-        Set<Integer> nodePartitions = 
appContext.getReplicaManager().getPartitions();
+        Partitions nodePartitions = 
appContext.getReplicaManager().getPartitions();
         Map<Long, LocalResource> localResources = 
localResourceRepository.getResources(lr -> true, nodePartitions);
         for (LocalResource resource : localResources.values()) {
             DatasetLocalResource lr = (DatasetLocalResource) 
resource.getResource();
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index e0277aaf81..14d16515da 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -26,7 +26,6 @@ import java.rmi.server.UnicastRemoteObject;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -78,6 +77,7 @@ import 
org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.common.transactions.IRecoveryManagerFactory;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.metadata.MetadataManager;
@@ -296,7 +296,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                         txnSubsystem.getLogManager(), virtualBufferCache, 
indexCheckpointManagerProvider, lockNotifier);
         
localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager);
         final String nodeId = getServiceContext().getNodeId();
-        final Set<Integer> nodePartitions = 
metadataProperties.getNodePartitions(nodeId);
+        final Partitions nodePartitions = 
metadataProperties.getNodePartitions(nodeId);
         replicaManager = new ReplicaManager(this, nodePartitions);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, 
getServiceContext().getNodeId(),
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 269f6f7224..1b2c17d595 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -39,7 +39,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -61,6 +60,7 @@ import 
org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogType;
 import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.Partitions;
 import 
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.asterix.transaction.management.service.logging.LogManager;
@@ -155,7 +155,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     }
 
     @Override
-    public void startLocalRecovery(Set<Integer> partitions) throws 
IOException, ACIDException {
+    public void startLocalRecovery(Partitions partitions) throws IOException, 
ACIDException {
         state = SystemState.RECOVERING;
         LOGGER.info("starting recovery for partitions {}", partitions);
         Checkpoint systemCheckpoint = checkpointManager.getLatest();
@@ -175,7 +175,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         replayPartitionsLogs(partitions, logMgr.getLogReader(true), 
lowWaterMarkLSN, true);
     }
 
-    public synchronized void replayPartitionsLogs(Set<Integer> partitions, 
ILogReader logReader, long lowWaterMarkLSN,
+    public synchronized void replayPartitionsLogs(Partitions partitions, 
ILogReader logReader, long lowWaterMarkLSN,
             boolean closeOnFlushRedo) throws IOException, ACIDException {
         try {
             Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, 
logReader, lowWaterMarkLSN);
@@ -186,7 +186,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         }
     }
 
-    private synchronized Set<Long> startRecoverysAnalysisPhase(Set<Integer> 
partitions, ILogReader logReader,
+    private synchronized Set<Long> startRecoverysAnalysisPhase(Partitions 
partitions, ILogReader logReader,
             long lowWaterMarkLSN) throws IOException, ACIDException {
         int updateLogCount = 0;
         int entityCommitLogCount = 0;
@@ -274,8 +274,8 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         jobEntityWinners.add(logRecord);
     }
 
-    private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, 
ILogReader logReader,
-            long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean 
closeOnFlushRedo) throws IOException, ACIDException {
+    private synchronized void startRecoveryRedoPhase(Partitions partitions, 
ILogReader logReader, long lowWaterMarkLSN,
+            Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws 
IOException, ACIDException {
         int redoCount = 0;
         long txnId = 0;
 
@@ -491,21 +491,20 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
 
     private long getRemoteMinFirstLSN() throws HyracksDataException {
         // find the min first lsn of partitions that are replicated on this 
node
-        final Set<Integer> allPartitions = 
localResourceRepository.getAllPartitions();
-        final Set<Integer> masterPartitions = 
appCtx.getReplicaManager().getPartitions();
+        final Partitions allPartitions = 
localResourceRepository.getAllPartitions();
+        final Partitions masterPartitions = 
appCtx.getReplicaManager().getPartitions();
         allPartitions.removeAll(masterPartitions);
         return getPartitionsMinLSN(allPartitions);
     }
 
-    private long getPartitionsMinLSN(Set<Integer> partitions) throws 
HyracksDataException {
+    private long getPartitionsMinLSN(Partitions partitions) throws 
HyracksDataException {
         final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = 
appCtx.getIndexCheckpointManagerProvider();
         long minRemoteLSN = Long.MAX_VALUE;
         for (Integer partition : partitions) {
             final List<DatasetResourceReference> partitionResources = 
localResourceRepository.getResources(resource -> {
                 DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
                 return dsResource.getPartition() == partition;
-            }, 
Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
-                    .collect(Collectors.toList());
+            }, 
Partitions.singleton(partition)).values().stream().map(DatasetResourceReference::of).toList();
             for (DatasetResourceReference indexRef : partitionResources) {
                 try {
                     final IIndexCheckpointManager idxCheckpointMgr = 
idxCheckpointMgrProvider.get(indexRef);
@@ -524,15 +523,14 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     }
 
     @Override
-    public long getPartitionsMaxLSN(Set<Integer> partitions) throws 
HyracksDataException {
+    public long getPartitionsMaxLSN(Partitions partitions) throws 
HyracksDataException {
         final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = 
appCtx.getIndexCheckpointManagerProvider();
         long maxLSN = 0;
-        for (Integer partition : partitions) {
+        for (int partition : partitions) {
             final List<DatasetResourceReference> partitionResources = 
localResourceRepository.getResources(resource -> {
                 DatasetLocalResource dsResource = (DatasetLocalResource) 
resource.getResource();
                 return dsResource.getPartition() == partition;
-            }, 
Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
-                    .collect(Collectors.toList());
+            }, 
Partitions.singleton(partition)).values().stream().map(DatasetResourceReference::of).toList();
             for (DatasetResourceReference indexRef : partitionResources) {
                 try {
                     final IIndexCheckpointManager idxCheckpointMgr = 
idxCheckpointMgrProvider.get(indexRef);
@@ -549,7 +547,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     }
 
     @Override
-    public synchronized void replayReplicaPartitionLogs(Set<Integer> 
partitions, boolean flush)
+    public synchronized void replayReplicaPartitionLogs(Partitions partitions, 
boolean flush)
             throws HyracksDataException {
         //replay logs > minLSN that belong to these partitions
         try {
@@ -597,7 +595,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         FileUtils.deleteQuietly(recoveryFolderPath.toFile());
     }
 
-    protected void cleanUp(Set<Integer> partitions) throws 
HyracksDataException {
+    protected void cleanUp(Partitions partitions) throws HyracksDataException {
         // the cleanup is currently done by 
PersistentLocalResourceRepository#clean
     }
 
@@ -657,7 +655,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         TxnEntityId loserEntity;
         List<Long> undoLSNSet = null;
         //get active partitions on this node
-        Set<Integer> activePartitions = 
appCtx.getReplicaManager().getPartitions();
+        Partitions activePartitions = 
appCtx.getReplicaManager().getPartitions();
         ILogReader logReader = logMgr.getLogReader(false);
         try {
             logReader.setPosition(firstLSN);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 101b1b0fe2..a1eee08225 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -19,12 +19,9 @@
 package org.apache.asterix.app.nc;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -34,6 +31,7 @@ import 
org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.replication.api.PartitionReplica;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.client.NodeStatus;
@@ -44,6 +42,9 @@ import org.apache.hyracks.util.annotations.ThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
 @ThreadSafe
 public class ReplicaManager implements IReplicaManager {
     private static final Logger LOGGER = LogManager.getLogger();
@@ -52,17 +53,17 @@ public class ReplicaManager implements IReplicaManager {
     /**
      * the partitions to which the current node is master
      */
-    private final Map<Integer, Object> partitions = new HashMap<>();
+    private final Int2ObjectMap<PartitionSyncLock> partitions = new 
Int2ObjectOpenHashMap<>();
     /**
      * current replicas
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new 
HashMap<>();
-    private final Set<Integer> nodeOriginatedPartitions = new HashSet<>();
+    private final Partitions nodeOriginatedPartitions = new Partitions();
 
-    public ReplicaManager(INcApplicationContext appCtx, Set<Integer> 
partitions) {
+    public ReplicaManager(INcApplicationContext appCtx, Partitions partitions) 
{
         this.appCtx = appCtx;
-        for (Integer partition : partitions) {
-            this.partitions.put(partition, new Object());
+        for (int partition : partitions) {
+            this.partitions.put(partition, new PartitionSyncLock());
         }
         setNodeOriginatedPartitions(appCtx);
     }
@@ -109,15 +110,15 @@ public class ReplicaManager implements IReplicaManager {
     }
 
     @Override
-    public synchronized Set<Integer> getPartitions() {
-        return Collections.unmodifiableSet(partitions.keySet());
+    public synchronized Partitions getPartitions() {
+        return new Partitions(partitions.keySet()).unmodifiable();
     }
 
     @Override
-    public synchronized void setActivePartitions(Set<Integer> 
activePartitions) {
+    public synchronized void setActivePartitions(Partitions activePartitions) {
         partitions.clear();
-        for (Integer partition : activePartitions) {
-            partitions.put(partition, new Object());
+        for (int partition : activePartitions) {
+            partitions.put(partition, new PartitionSyncLock());
         }
     }
 
@@ -132,9 +133,9 @@ public class ReplicaManager implements IReplicaManager {
         if (!appCtx.isCloudDeployment()) {
             localResourceRepository.cleanup(partition);
             final IRecoveryManager recoveryManager = 
appCtx.getTransactionSubsystem().getRecoveryManager();
-            
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()),
 true);
+            
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Partitions.collector()),
 true);
         }
-        partitions.put(partition, new Object());
+        partitions.put(partition, new PartitionSyncLock());
     }
 
     @Override
@@ -151,8 +152,8 @@ public class ReplicaManager implements IReplicaManager {
     }
 
     @Override
-    public synchronized Object getPartitionSyncLock(int partition) {
-        Object syncLock = partitions.get(partition);
+    public synchronized PartitionSyncLock getPartitionSyncLock(int partition) {
+        PartitionSyncLock syncLock = partitions.get(partition);
         if (syncLock == null) {
             throw new IllegalStateException("partition " + partition + " is 
not active on this node");
         }
@@ -187,7 +188,7 @@ public class ReplicaManager implements IReplicaManager {
     }
 
     private void setNodeOriginatedPartitions(INcApplicationContext appCtx) {
-        Set<Integer> nodePartitions =
+        Partitions nodePartitions =
                 
appCtx.getMetadataProperties().getNodePartitions(appCtx.getServiceContext().getNodeId());
         nodeOriginatedPartitions.addAll(nodePartitions);
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
index cc594884ba..00205a64c6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java
@@ -18,13 +18,11 @@
  */
 package org.apache.asterix.app.nc.task;
 
-import java.util.Arrays;
-import java.util.Set;
-
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
 import org.apache.asterix.common.transactions.Checkpoint;
+import org.apache.asterix.common.utils.Partitions;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,13 +35,12 @@ public class CloudToLocalStorageCachingTask implements 
INCLifecycleTask {
 
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> storagePartitions;
+    private static final long serialVersionUID = 2L;
+    private final Partitions storagePartitions;
     private final boolean metadataNode;
     private final int metadataPartitionId;
 
-    public CloudToLocalStorageCachingTask(Set<Integer> storagePartitions, 
boolean metadataNode,
-            int metadataPartitionId) {
+    public CloudToLocalStorageCachingTask(Partitions storagePartitions, 
boolean metadataNode, int metadataPartitionId) {
         this.storagePartitions = storagePartitions;
         this.metadataNode = metadataNode;
         this.metadataPartitionId = metadataPartitionId;
@@ -78,7 +75,6 @@ public class CloudToLocalStorageCachingTask implements 
INCLifecycleTask {
 
     @Override
     public String toString() {
-        return "{ \"class\" : \"" + getClass().getSimpleName() + "\", 
\"partitions\" : "
-                + Arrays.toString(storagePartitions.toArray()) + " }";
+        return "{ \"class\" : \"" + getClass().getSimpleName() + "\", 
\"partitions\" : " + storagePartitions + " }";
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
index b8addb29bc..e906de77f0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java
@@ -19,22 +19,21 @@
 package org.apache.asterix.app.nc.task;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 
 public class LocalRecoveryTask implements INCLifecycleTask {
 
-    private static final long serialVersionUID = 1L;
-    private final Set<Integer> partitions;
+    private static final long serialVersionUID = 2L;
+    private final Partitions partitions;
 
-    public LocalRecoveryTask(Set<Integer> partitions) {
+    public LocalRecoveryTask(Partitions partitions) {
         this.partitions = partitions;
     }
 
@@ -52,7 +51,6 @@ public class LocalRecoveryTask implements INCLifecycleTask {
 
     @Override
     public String toString() {
-        return "{ \"class\" : \"" + getClass().getSimpleName() + "\", 
\"partitions\" : "
-                + Arrays.toString(partitions.toArray()) + " }";
+        return "{ \"class\" : \"" + getClass().getSimpleName() + "\", 
\"partitions\" : " + partitions + " }";
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
index 61a07d15ab..673e401cf5 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -18,12 +18,11 @@
  */
 package org.apache.asterix.app.nc.task;
 
-import java.util.Set;
-
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.utils.Partitions;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -45,7 +44,7 @@ public class LocalStorageCleanupTask implements 
INCLifecycleTask {
                 (PersistentLocalResourceRepository) 
appContext.getLocalResourceRepository();
         localResourceRepository.deleteCorruptedResources();
         deleteInvalidMetadataIndexes(localResourceRepository);
-        final Set<Integer> nodePartitions = 
appContext.getReplicaManager().getPartitions();
+        final Partitions nodePartitions = 
appContext.getReplicaManager().getPartitions();
         INcApplicationContext appCtx = (INcApplicationContext) 
cs.getApplicationContext();
         if (appCtx.isCloudDeployment() && 
nodePartitions.contains(metadataPartitionId)) {
             
appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL();
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
index 17eff4ad81..b4b2d0987a 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java
@@ -18,11 +18,10 @@
  */
 package org.apache.asterix.app.nc.task;
 
-import java.util.Set;
-
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.storage.IReplicaManager;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,11 +33,11 @@ import org.apache.logging.log4j.Logger;
 public class UpdateNodeStatusTask implements INCLifecycleTask {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
     private final NodeStatus status;
-    private final Set<Integer> activePartitions;
+    private final Partitions activePartitions;
 
-    public UpdateNodeStatusTask(NodeStatus status, Set<Integer> 
activePartitions) {
+    public UpdateNodeStatusTask(NodeStatus status, Partitions 
activePartitions) {
         this.status = status;
         this.activePartitions = activePartitions;
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index fff1bc118c..9ccb66a0b7 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -60,6 +60,7 @@ import 
org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.replication.messaging.ReplicaFailedMessage;
 import org.apache.http.client.utils.URIBuilder;
@@ -172,7 +173,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
     }
 
     protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus 
nodeStatus, SystemState state,
-            Set<Integer> activePartitions) {
+            Partitions activePartitions) {
         LOGGER.info(
                 "Building registration tasks for node {} with status {} and 
system state: {} and active partitions {}",
                 nodeId, nodeStatus, state, activePartitions);
@@ -217,9 +218,9 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
     }
 
     protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, 
boolean metadataNode, SystemState state,
-            Set<Integer> activePartitions) {
+            Partitions activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
-        Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, 
activePartitions, metadataNode, state);
+        Partitions nodeActivePartitions = getNodeActivePartitions(newNodeId, 
activePartitions, metadataNode, state);
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, 
nodeActivePartitions));
         int metadataPartitionId = 
clusterManager.getMetadataPartition().getPartitionId();
         // Add any cloud-related tasks
@@ -256,7 +257,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
         return tasks;
     }
 
-    protected void addCloudTasks(List<INCLifecycleTask> tasks, Set<Integer> 
computePartitions, boolean metadataNode,
+    protected void addCloudTasks(List<INCLifecycleTask> tasks, Partitions 
computePartitions, boolean metadataNode,
             int metadataPartitionId) {
         IApplicationContext appCtx = (IApplicationContext) 
serviceContext.getApplicationContext();
         if (!appCtx.isCloudDeployment()) {
@@ -265,7 +266,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
 
         StorageComputePartitionsMap map = 
clusterManager.getStorageComputeMap();
         map = map == null ? 
StorageComputePartitionsMap.computePartitionsMap(clusterManager) : map;
-        Set<Integer> storagePartitions = 
map.getStoragePartitions(computePartitions);
+        Partitions storagePartitions = 
map.getStoragePartitions(computePartitions);
         tasks.add(new CloudToLocalStorageCachingTask(storagePartitions, 
metadataNode, metadataPartitionId));
     }
 
@@ -322,7 +323,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
         return true;
     }
 
-    protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> 
nodePartitions, boolean metadataNode,
+    protected Partitions getNodeActivePartitions(String nodeId, Partitions 
nodePartitions, boolean metadataNode,
             SystemState state) {
         if (metadataNode) {
             
nodePartitions.add(clusterManager.getMetadataPartition().getPartitionId());
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
index b2a2dd771a..082f5b5462 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java
@@ -18,25 +18,24 @@
  */
 package org.apache.asterix.app.replication.message;
 
-import java.util.Set;
-
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, 
ICcAddressedMessage {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final String nodeId;
     private final boolean success;
     private Throwable exception;
     private final NcLocalCounters localCounters;
-    private final Set<Integer> activePartitions;
+    private final Partitions activePartitions;
 
     public NCLifecycleTaskReportMessage(String nodeId, boolean success, 
NcLocalCounters localCounters,
-            Set<Integer> activePartitions) {
+            Partitions activePartitions) {
         this.nodeId = nodeId;
         this.success = success;
         this.localCounters = localCounters;
@@ -73,7 +72,7 @@ public class NCLifecycleTaskReportMessage implements 
INCLifecycleMessage, ICcAdd
         return MessageType.REGISTRATION_TASKS_RESULT;
     }
 
-    public Set<Integer> getActivePartitions() {
+    public Partitions getActivePartitions() {
         return activePartitions;
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
index 9e95ac644d..976308c81d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java
@@ -19,13 +19,13 @@
 package org.apache.asterix.app.replication.message;
 
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.messaging.api.ICcAddressedMessage;
 import org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.client.NodeStatus;
 import org.apache.hyracks.api.control.CcId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -37,15 +37,15 @@ import org.apache.logging.log4j.Logger;
 public class RegistrationTasksRequestMessage implements INCLifecycleMessage, 
ICcAddressedMessage {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
     private final SystemState state;
     private final String nodeId;
     private final NodeStatus nodeStatus;
     private final Map<String, Object> secrets;
-    private final Set<Integer> activePartitions;
+    private final Partitions activePartitions;
 
     public RegistrationTasksRequestMessage(String nodeId, NodeStatus 
nodeStatus, SystemState state,
-            Map<String, Object> secretsEphemeral, Set<Integer> 
activePartitions) {
+            Map<String, Object> secretsEphemeral, Partitions activePartitions) 
{
         this.state = state;
         this.nodeId = nodeId;
         this.nodeStatus = nodeStatus;
@@ -54,7 +54,7 @@ public class RegistrationTasksRequestMessage implements 
INCLifecycleMessage, ICc
     }
 
     public static void send(CcId ccId, NodeControllerService cs, NodeStatus 
nodeStatus, SystemState systemState,
-            Map<String, Object> secretsEphemeral, Set<Integer> 
activePartitions) throws HyracksDataException {
+            Map<String, Object> secretsEphemeral, Partitions activePartitions) 
throws HyracksDataException {
         try {
             RegistrationTasksRequestMessage msg = new 
RegistrationTasksRequestMessage(cs.getId(), nodeStatus,
                     systemState, secretsEphemeral, activePartitions);
@@ -92,7 +92,7 @@ public class RegistrationTasksRequestMessage implements 
INCLifecycleMessage, ICc
         return secrets;
     }
 
-    public Set<Integer> getActivePartitions() {
+    public Partitions getActivePartitions() {
         return activePartitions;
     }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
index 9d378f5a41..285563e645 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java
@@ -18,9 +18,7 @@
  */
 package org.apache.asterix.app.replication.message;
 
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.asterix.common.api.INCLifecycleTask;
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -29,6 +27,7 @@ import 
org.apache.asterix.common.messaging.api.INCMessageBroker;
 import org.apache.asterix.common.messaging.api.INcAddressedMessage;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.service.IControllerService;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -74,7 +73,7 @@ public class RegistrationTasksResponseMessage extends 
CcIdentifiedMessage
                 LOGGER.debug("returning local counters to cc: {}", 
localCounter);
             }
             // wrap the returned partitions in a hash set to make it 
serializable
-            Set<Integer> nodeActivePartitions = new 
HashSet<>(appCtx.getReplicaManager().getPartitions());
+            Partitions nodeActivePartitions = new 
Partitions(appCtx.getReplicaManager().getPartitions());
             NCLifecycleTaskReportMessage result =
                     new NCLifecycleTaskReportMessage(nodeId, success, 
localCounter, nodeActivePartitions);
             result.setException(exception);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 53b9294e2b..6b99253791 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -19,9 +19,7 @@
 package org.apache.asterix.runtime;
 
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +35,7 @@ import org.apache.asterix.common.config.StorageProperties;
 import org.apache.asterix.common.metadata.IMetadataBootstrap;
 import org.apache.asterix.common.utils.NcLocalCounters;
 import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.runtime.transaction.ResourceIdManager;
 import org.apache.asterix.runtime.utils.BulkTxnIdFactory;
@@ -277,8 +276,8 @@ public class ClusterStateManagerTest {
         return localCounters;
     }
 
-    private static Set<Integer> getNodeActivePartitions(String nodeId) {
-        Set<Integer> activePartitions = new HashSet<>();
+    private static Partitions getNodeActivePartitions(String nodeId) {
+        Partitions activePartitions = new Partitions();
         switch (nodeId) {
             case NC1:
                 activePartitions.add(0);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 7712f08653..6fc271e382 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -126,7 +126,7 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
     }
 
     @Override
-    public final void bootstrap(Set<Integer> activePartitions, 
List<FileReference> currentOnDiskPartitions,
+    public final void bootstrap(Partitions activePartitions, 
List<FileReference> currentOnDiskPartitions,
             boolean metadataNode, int metadataPartition, boolean 
ensureCompleteBootstrap) throws HyracksDataException {
         partitions.clear();
         partitions.addAll(activePartitions);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
index f2451ee04f..d5d0d87f3d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java
@@ -20,10 +20,10 @@
 package org.apache.asterix.cloud;
 
 import java.util.List;
-import java.util.Set;
 
 import org.apache.asterix.common.cloud.IPartitionBootstrapper;
 import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
@@ -48,7 +48,7 @@ public class LocalPartitionBootstrapper implements 
IPartitionBootstrapper {
     }
 
     @Override
-    public void bootstrap(Set<Integer> activePartitions, List<FileReference> 
currentOnDiskPartitions,
+    public void bootstrap(Partitions activePartitions, List<FileReference> 
currentOnDiskPartitions,
             boolean metadataNode, int metadataPartition, boolean 
ensureCompleteBootstrap) throws HyracksDataException {
         for (FileReference onDiskPartition : currentOnDiskPartitions) {
             int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
index f05a0e5820..b6683ad20c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java
@@ -47,18 +47,19 @@ public final class S3ClientConfig {
     private final boolean forcePathStyle;
     private final boolean disableSslVerify;
     private final boolean storageListEventuallyConsistent;
+    private final boolean enableCrtClient;
 
     public S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
-            long profilerLogInterval, int writeBufferSize) {
+            long profilerLogInterval, int writeBufferSize, boolean 
enableCrtClient) {
         this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, 
writeBufferSize, 1, 0, 0, 0, false, false,
-                false, 0, 0);
+                false, 0, 0, enableCrtClient);
     }
 
     private S3ClientConfig(String region, String endpoint, String prefix, 
boolean anonymousAuth,
             long profilerLogInterval, int writeBufferSize, long 
tokenAcquireTimeout, int writeMaxRequestsPerSeconds,
             int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, 
boolean forcePathStyle,
             boolean disableSslVerify, boolean storageListEventuallyConsistent, 
int requestsMaxPendingHttpConnections,
-            int requestsHttpConnectionAcquireTimeout) {
+            int requestsHttpConnectionAcquireTimeout, boolean enableCrtClient) 
{
         this.region = Objects.requireNonNull(region, "region");
         this.endpoint = endpoint;
         this.prefix = Objects.requireNonNull(prefix, "prefix");
@@ -74,6 +75,7 @@ public final class S3ClientConfig {
         this.forcePathStyle = forcePathStyle;
         this.disableSslVerify = disableSslVerify;
         this.storageListEventuallyConsistent = storageListEventuallyConsistent;
+        this.enableCrtClient = enableCrtClient;
     }
 
     public static S3ClientConfig of(CloudProperties cloudProperties) {
@@ -85,7 +87,7 @@ public final class S3ClientConfig {
                 cloudProperties.isStorageForcePathStyle(), 
cloudProperties.isStorageDisableSSLVerify(),
                 cloudProperties.isStorageListEventuallyConsistent(),
                 cloudProperties.getRequestsMaxPendingHttpConnections(),
-                cloudProperties.getRequestsHttpConnectionAcquireTimeout());
+                cloudProperties.getRequestsHttpConnectionAcquireTimeout(), 
cloudProperties.isS3EnableCrtClient());
     }
 
     public static S3ClientConfig of(Map<String, String> configuration, int 
writeBufferSize) {
@@ -99,7 +101,7 @@ public final class S3ClientConfig {
         String prefix = "";
         boolean anonymousAuth = false;
 
-        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval, writeBufferSize);
+        return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, 
profilerLogInterval, writeBufferSize, false);
     }
 
     public String getRegion() {
@@ -116,7 +118,7 @@ public final class S3ClientConfig {
 
     public boolean isLocalS3Provider() {
         // to workaround https://github.com/findify/s3mock/issues/187 in our 
S3Mock, we encode/decode keys
-        return isS3Mock();
+        return false; //isS3Mock();
     }
 
     public AwsCredentialsProvider createCredentialsProvider() {
@@ -167,8 +169,8 @@ public final class S3ClientConfig {
         return storageListEventuallyConsistent;
     }
 
-    private boolean isS3Mock() {
-        return endpoint != null && !endpoint.isEmpty();
+    public boolean isCrtClientEnabled() {
+        return enableCrtClient;
     }
 
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
index 2acc12e7ba..50d05c66d5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java
@@ -181,7 +181,7 @@ class S3ParallelDownloader implements IParallelDownloader {
 
     private static S3AsyncClient createAsyncClient(S3ClientConfig config) {
         // CRT client is not supported by all local S3 providers, but provides 
a better performance with AWS S3
-        if (!config.isLocalS3Provider()) {
+        if (config.isCrtClientEnabled()) {
             return createS3CrtAsyncClient(config);
         }
         return createS3AsyncClient(config);
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
index fad8081d99..1bb0f746e4 100644
--- 
a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
+++ 
b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java
@@ -68,7 +68,7 @@ public class LSMS3Test extends AbstractLSMTest {
         LOGGER.info("Client created successfully");
         int writeBufferSize = StorageUtil.getIntSizeInBytes(5, 
StorageUtil.StorageUnit.MEGABYTE);
         S3ClientConfig config =
-                new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
"", true, 0, writeBufferSize);
+                new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, 
"", true, 0, writeBufferSize, false);
         CLOUD_CLIENT = new S3CloudClient(config, 
ICloudGuardian.NoOpCloudGuardian.INSTANCE);
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
index 88e88abac4..3cca8f5d25 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java
@@ -19,9 +19,9 @@
 package org.apache.asterix.common.cloud;
 
 import java.util.List;
-import java.util.Set;
 
 import org.apache.asterix.common.transactions.IRecoveryManager;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 
@@ -52,6 +52,6 @@ public interface IPartitionBootstrapper {
      * @param metadataPartition       metadata partition number
      * @param ensureCompleteBootstrap ensures the metadata catalog was fully 
bootstrapped
      */
-    void bootstrap(Set<Integer> activePartitions, List<FileReference> 
currentOnDiskPartitions, boolean metadataNode,
+    void bootstrap(Partitions activePartitions, List<FileReference> 
currentOnDiskPartitions, boolean metadataNode,
             int metadataPartition, boolean ensureCompleteBootstrap) throws 
HyracksDataException;
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 6c0982a38e..ffc554c33e 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -27,6 +27,7 @@ import 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.utils.NcLocalCounters;
+import org.apache.asterix.common.utils.Partitions;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.config.IOption;
@@ -65,7 +66,7 @@ public interface IClusterStateManager {
      * @param activePartitions
      * @throws HyracksDataException
      */
-    void updateNodeState(String nodeId, boolean active, NcLocalCounters 
ncLocalCounters, Set<Integer> activePartitions)
+    void updateNodeState(String nodeId, boolean active, NcLocalCounters 
ncLocalCounters, Partitions activePartitions)
             throws HyracksDataException;
 
     /**
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index dc62cac667..ef440eb678 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -21,13 +21,14 @@ package org.apache.asterix.common.cluster;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.asterix.common.utils.Partitions;
+
 public class StorageComputePartitionsMap {
 
     private final Map<Integer, ComputePartition> storageComputeMap = new 
HashMap<>();
@@ -83,8 +84,8 @@ public class StorageComputePartitionsMap {
      * @param computePartitions the current active compute partitions
      * @return computePartitions's corresponding storage partitions
      */
-    public Set<Integer> getStoragePartitions(Set<Integer> computePartitions) {
-        Set<Integer> storagePartitions = new HashSet<>();
+    public Partitions getStoragePartitions(Partitions computePartitions) {
+        Partitions storagePartitions = new Partitions();
         for (Map.Entry<Integer, ComputePartition> entry : 
storageComputeMap.entrySet()) {
             ComputePartition computePartition = entry.getValue();
             if (computePartitions.contains(computePartition.getId())) {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
index f556fcbb94..392282f6ec 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java
@@ -28,8 +28,10 @@ import static 
org.apache.hyracks.control.common.config.OptionTypes.getRangedInte
 import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import org.apache.asterix.common.cloud.CloudCachePolicy;
+import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
 import org.apache.hyracks.api.config.Section;
@@ -70,7 +72,11 @@ public class CloudProperties extends AbstractProperties {
         CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT(POSITIVE_INTEGER, 120),
         CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false),
         CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false),
-        CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false);
+        CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false),
+        CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT(BOOLEAN, 
(Function<IApplicationConfig, Boolean>) app -> {
+            String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT);
+            return endpoint == null || endpoint.isEmpty();
+        });
 
         private final IOptionType interpreter;
         private final Object defaultValue;
@@ -80,6 +86,11 @@ public class CloudProperties extends AbstractProperties {
             this.defaultValue = defaultValue;
         }
 
+        <T> Option(IOptionType<T> interpreter, Function<IApplicationConfig, T> 
defaultOption) {
+            this.interpreter = interpreter;
+            this.defaultValue = defaultOption;
+        }
+
         @Override
         public Section section() {
             switch (this) {
@@ -134,62 +145,55 @@ public class CloudProperties extends AbstractProperties {
                             + "all partitions upon booting, whereas 'lazy' 
caching will download a file upon"
                             + " request to open it. 'selective' caching will 
act as the 'lazy' policy; however, "
                             + " it allows to use the local disk(s) as a cache, 
where pages and indexes can be "
-                            + " cached or evicted according to the pressure 
imposed on the local disks."
-                            + " (default: 'selective')";
+                            + " cached or evicted according to the pressure 
imposed on the local disks";
                 case CLOUD_STORAGE_ALLOCATION_PERCENTAGE:
                     return "The percentage of the total disk space that should 
be allocated for data storage when the"
                             + " 'selective' caching policy is used. The 
remaining will act as a buffer for "
-                            + " query workspace (i.e., for query operations 
that require spilling to disk)."
-                            + " (default: 80% of the total disk space)";
+                            + " query workspace (i.e., for query operations 
that require spilling to disk)";
                 case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE:
                     return "The percentage of the used storage space at which 
the disk sweeper starts freeing space by"
                             + " punching holes in stored indexes or by 
evicting them entirely, "
-                            + " when the 'selective' caching policy is used."
-                            + " (default: 90% of the allocated space for 
storage)";
+                            + " when the 'selective' caching policy is used";
                 case CLOUD_STORAGE_DISK_MONITOR_INTERVAL:
                     return "The disk monitoring interval time (in seconds): 
determines how often the system"
-                            + " checks for pressure on disk space when using 
the 'selective' caching policy."
-                            + " (default : 120 seconds)";
+                            + " checks for pressure on disk space when using 
the 'selective' caching policy";
                 case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD:
-                    return "The duration in minutes to consider an index is 
inactive. (default: 360 or 6 hours)";
+                    return "The duration in minutes to consider an index is 
inactive";
                 case CLOUD_STORAGE_DEBUG_MODE_ENABLED:
-                    return "Whether or not the debug mode is enabled when 
using the 'selective' caching policy."
-                            + "(default: false)";
+                    return "Whether or not the debug mode is enabled when 
using the 'selective' caching policy";
                 case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE:
                     return "For debugging only. Pressure size will be the 
current used space + the additional bytes"
                             + " provided by this configuration option instead 
of using "
-                            + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE."
-                            + " (default: 0. I.e., 
CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)";
+                            + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE";
                 case CLOUD_PROFILER_LOG_INTERVAL:
-                    return "The waiting time (in minutes) to log cloud request 
statistics. The minimum is 1 minute."
-                            + " Note: by default, the logging is disabled. 
Enabling it could perturb the performance of cloud requests";
+                    return "The waiting time (in minutes) to log cloud request 
statistics. The minimum is 1 minute"
+                            + " Note: enabling this logging may perturb the 
performance of cloud requests";
                 case CLOUD_ACQUIRE_TOKEN_TIMEOUT:
                     return "The waiting time (in milliseconds) if a requesting 
thread failed to acquire a token if the"
-                            + " rate limit of cloud requests exceeded 
(default: 100, min: 1, and max: 5000)";
+                            + " rate limit of cloud requests exceeded (min: 1, 
max: 5000)";
                 case CLOUD_MAX_WRITE_REQUESTS_PER_SECOND:
-                    return "The maximum number of write requests per second 
(default: 250, 0 means unlimited)";
+                    return "The maximum number of write requests per second (0 
means unlimited)";
                 case CLOUD_MAX_READ_REQUESTS_PER_SECOND:
-                    return "The maximum number of read requests per second 
(default: 1500, 0 means unlimited)";
+                    return "The maximum number of read requests per second (0 
means unlimited)";
                 case CLOUD_WRITE_BUFFER_SIZE:
-                    return "The write buffer size in bytes. (default: 8MB, 
min: 5MB)";
+                    return "The write buffer size in bytes. (min: 5MiB)";
                 case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD:
-                    return "The number of cloud reads for re-evaluating an 
eviction plan. (default: 50)";
+                    return "The number of cloud reads for re-evaluating an 
eviction plan";
                 case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS:
-                    return "The maximum number of HTTP connections to use 
concurrently for cloud requests per node. (default: 1000)";
+                    return "The maximum number of HTTP connections to use 
concurrently for cloud requests per node";
                 case CLOUD_REQUESTS_MAX_PENDING_HTTP_CONNECTIONS:
-                    return "The maximum number of HTTP connections allowed to 
wait for a connection per node. (default: 10000)";
+                    return "The maximum number of HTTP connections allowed to 
wait for a connection per node";
                 case CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT:
-                    return "The waiting time (in seconds) to acquire an HTTP 
connection before failing the request."
-                            + " (default: 120 seconds)";
+                    return "The waiting time (in seconds) to acquire an HTTP 
connection before failing the request";
                 case CLOUD_STORAGE_FORCE_PATH_STYLE:
-                    return "Indicates whether or not to force path style when 
accessing the cloud storage. (default:"
-                            + " false)";
+                    return "Indicates whether or not to force path style when 
accessing the cloud storage";
                 case CLOUD_STORAGE_DISABLE_SSL_VERIFY:
-                    return "Indicates whether or not to disable SSL 
certificate verification on the cloud storage. "
-                            + "(default: false)";
+                    return "Indicates whether or not to disable SSL 
certificate verification on the cloud storage";
                 case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT:
                     return "Indicates whether or not deleted objects may be 
contained in list operations for some time"
-                            + "after they are deleted. (default: false)";
+                            + "after they are deleted";
+                case CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT:
+                    return "Indicates whether or not to use the AWS CRT S3 
client for async requests";
                 default:
                     throw new IllegalStateException("NYI: " + this);
             }
@@ -205,6 +209,13 @@ public class CloudProperties extends AbstractProperties {
             return defaultValue;
         }
 
+        @Override
+        public String usageDefaultOverride(IApplicationConfig accessor, 
Function<IOption, String> optionPrinter) {
+            if (this == CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT) {
+                return "true when no custom endpoint is set, otherwise false";
+            }
+            return IOption.super.usageDefaultOverride(accessor, optionPrinter);
+        }
     }
 
     public String getStorageScheme() {
@@ -309,4 +320,8 @@ public class CloudProperties extends AbstractProperties {
     public boolean isStorageListEventuallyConsistent() {
         return 
accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT);
     }
+
+    public boolean isS3EnableCrtClient() {
+        return accessor.getBoolean(Option.CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT);
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
index 8d18bfd8e9..568802c059 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java
@@ -24,10 +24,10 @@ import static 
org.apache.hyracks.control.common.config.OptionTypes.STRING;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.SortedMap;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.config.IApplicationConfig;
 import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.api.config.IOptionType;
@@ -124,7 +124,7 @@ public class MetadataProperties extends AbstractProperties {
         return accessor.getClusterPartitions();
     }
 
-    public Set<Integer> getNodePartitions(String nodeId) {
+    public Partitions getNodePartitions(String nodeId) {
         return accessor.getNodePartitions(nodeId);
     }
 
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
index 80f9a17f99..84e2ff00fe 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java
@@ -36,11 +36,11 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.common.utils.PrintUtil;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -196,9 +196,10 @@ public class PropertiesAccessor implements 
IApplicationConfig {
         return clusterPartitions;
     }
 
-    public Set<Integer> getNodePartitions(String nodeId) {
+    public Partitions getNodePartitions(String nodeId) {
         ClusterPartition[] nodeClusterPartitions = 
nodePartitionsMap.get(nodeId);
-        return 
Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+        return 
Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId)
+                .collect(Partitions.collector());
     }
 
     public List<AsterixExtension> getExtensions() {
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 14fc9d8e0c..daedf8774e 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -19,9 +19,9 @@
 package org.apache.asterix.common.storage;
 
 import java.util.List;
-import java.util.Set;
 
 import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public interface IReplicaManager {
@@ -54,14 +54,14 @@ public interface IReplicaManager {
      *
      * @return The list of partition
      */
-    Set<Integer> getPartitions();
+    Partitions getPartitions();
 
     /**
      * Sets the node active partitions
      *
      * @param activePartitions
      */
-    void setActivePartitions(Set<Integer> activePartitions);
+    void setActivePartitions(Partitions activePartitions);
 
     /**
      * Promotes a partition by making this node its master replica
@@ -87,7 +87,7 @@ public interface IReplicaManager {
      *
      * @return the synchronization lock
      */
-    Object getPartitionSyncLock(int partition);
+    PartitionSyncLock getPartitionSyncLock(int partition);
 
     /**
      * Gets the partition replicas matching {@code id}
@@ -110,4 +110,6 @@ public interface IReplicaManager {
      * @return true if the partition is originated by this node, otherwise 
false.
      */
     boolean isPartitionOrigin(int partition);
+
+    record PartitionSyncLock() {}
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
index 3437d4202d..cf96816ee9 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java
@@ -21,9 +21,9 @@ package org.apache.asterix.common.transactions;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
 
 import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 
@@ -109,7 +109,7 @@ public interface IRecoveryManager {
      * @throws IOException
      * @throws ACIDException
      */
-    void startLocalRecovery(Set<Integer> partitions) throws IOException, 
ACIDException;
+    void startLocalRecovery(Partitions partitions) throws IOException, 
ACIDException;
 
     /**
      * Gets the LSN used in {@code partitions} or 0 if no LSNs are found
@@ -118,7 +118,7 @@ public interface IRecoveryManager {
      * @return the maximum used LSN
      * @throws HyracksDataException
      */
-    long getPartitionsMaxLSN(Set<Integer> partitions) throws 
HyracksDataException;
+    long getPartitionsMaxLSN(Partitions partitions) throws 
HyracksDataException;
 
     /**
      * Replay the commited transactions' logs belonging to {@code partitions}. 
if {@code flush} is true,
@@ -128,7 +128,7 @@ public interface IRecoveryManager {
      * @param flush
      * @throws HyracksDataException
      */
-    void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) 
throws HyracksDataException;
+    void replayReplicaPartitionLogs(Partitions partitions, boolean flush) 
throws HyracksDataException;
 
     /**
      * Ensures that {@code datasetPartitionIndexes} are consistent by 
performing component id level recovery
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java
index e939ce13b7..1405661a7b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java
@@ -21,38 +21,58 @@ package org.apache.asterix.common.utils;
 import java.io.Serial;
 import java.io.Serializable;
 import java.util.Set;
+import java.util.stream.Collector;
+import java.util.stream.IntStream;
 
 import org.apache.commons.lang3.mutable.MutableBoolean;
 
+import it.unimi.dsi.fastutil.ints.IntIterable;
 import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.ints.IntSortedSet;
 import it.unimi.dsi.fastutil.ints.IntSortedSets;
 
 /**
- * A specialized bit set for storing partition IDs.
- * Partition IDs are in the range [-1, 32766] (inclusive).
- * The value -1 is used to represent the "unpartitioned" partition.
- * The value 32767 is reserved for the internal shift and cannot be used as a 
partition ID.
- * This class internally shifts the partition IDs by +1 to fit into an 
unsigned short range [0, 32767].
+ * A specialized data structure for efficiently storing partitions.
+ * Partitions are in the range [-1, 2_147_483_646] (inclusive).
+ * The value -1 is used to represent the "metadata" partition.
+ * The value 2_147_483_647 is reserved for the internal shift and cannot be 
used as a partition.
+ * This class internally shifts the partitions by +1 to fit into the signed 
int range [0, 2_147_483_647].
  */
-public class Partitions implements Serializable {
+public class Partitions implements IntIterable, Serializable {
     @Serial
     private static final long serialVersionUID = 1L;
-    private static final int MAX_PARTITION_ID = Integer.MAX_VALUE - 1;
-    private static final int MIN_PARTITION_ID = -1;
+    private static final int DELTA = -1;
+    private static final int MAX_PARTITION = Integer.MAX_VALUE + DELTA;
+    private static final int MIN_PARTITION = DELTA;
+    private static final int DEFAULT_MAX_PARTITIONS = 128;
     private static final Partitions EMPTY = new 
Partitions(IntSortedSets.EMPTY_SET);
-    private static final short MINUS_ONE = (short) -1;
     private final IntSortedSet delegate;
 
     public Partitions() {
-        this(new IntSortedBitSet());
+        this(DEFAULT_MAX_PARTITIONS);
     }
 
-    public Partitions(int initialMaxValue) {
-        this(new IntSortedBitSet(initialMaxValue + 1));
+    public Partitions(int initialMaxPartition) {
+        this(new IntSortedBitSet(encodePartition(initialMaxPartition)));
     }
 
-    public Partitions(IntSortedSet delegate) {
+    public Partitions(Set<Integer> initialValues) {
+        this();
+        initialValues.forEach(this::add);
+    }
+
+    public Partitions(IntSet initialValues) {
+        this();
+        initialValues.forEach(this::add);
+    }
+
+    public Partitions(Partitions initialValues) {
+        this();
+        addAll(initialValues);
+    }
+
+    private Partitions(IntSortedSet delegate) {
         this.delegate = delegate;
     }
 
@@ -63,17 +83,43 @@ public class Partitions implements Serializable {
         return EMPTY;
     }
 
+    public static Partitions singleton(int partition) {
+        return new 
Partitions(IntSortedSets.singleton(encodePartition(partition)));
+    }
+
+    public static Collector<? super Integer, Partitions, Partitions> 
collector() {
+        return Collector.of(Partitions::new, Partitions::add, (l, r) -> {
+            l.addAll(r);
+            return l;
+        }, Collector.Characteristics.UNORDERED, 
Collector.Characteristics.IDENTITY_FINISH);
+    }
+
     /**
-     * Adds a partition ID to the set. Partition ID must be in the range [-1, 
32766].
-     * @param k the partition ID to add
-     * @return true if the partition ID was added, false if it was already 
present
-     * @throws IllegalArgumentException if the partition ID is out of range
+     * Adds a partition to the set. Partition must be in the range [-1, 
2_147_483_646].
+     * @param partition the partition to add
+     * @return true if the partition was added, false if it was already present
+     * @throws IllegalArgumentException if the partition is out of range
      */
-    public boolean add(int k) {
-        if (k > MAX_PARTITION_ID || k < MIN_PARTITION_ID) {
-            throw new IllegalArgumentException("Partition number " + k + " out 
of range");
+    public boolean add(int partition) {
+        checkRange(partition);
+        return delegate.add(encodePartition(partition));
+    }
+
+    /**
+     * Removes a partition from the set. Partition must be in the range [-1, 
2_147_483_646].
+     * @param partition the partition to remove
+     * @return true if the partition was added, false if it was already present
+     * @throws IllegalArgumentException if the partition is out of range
+     */
+    public boolean remove(int partition) {
+        checkRange(partition);
+        return delegate.remove(encodePartition(partition));
+    }
+
+    private static void checkRange(int partition) {
+        if (partition > MAX_PARTITION || partition < MIN_PARTITION) {
+            throw new IllegalArgumentException("Partition number " + partition 
+ " out of range");
         }
-        return delegate.add((short) (k + 1));
     }
 
     public boolean isEmpty() {
@@ -94,15 +140,44 @@ public class Partitions implements Serializable {
         return retval.booleanValue();
     }
 
+    public boolean addAll(Partitions activePartitions) {
+        return delegate.addAll(activePartitions.delegate);
+    }
+
     public int size() {
         return delegate.size();
     }
 
-    public boolean contains(int partitionNum) {
-        if (partitionNum > MAX_PARTITION_ID || partitionNum < 
MIN_PARTITION_ID) {
+    public boolean contains(int partition) {
+        if (partition > MAX_PARTITION || partition < MIN_PARTITION) {
             return false;
         }
-        return delegate.contains((short) (partitionNum + 1));
+        return delegate.contains(encodePartition(partition));
+    }
+
+    @Override
+    public IntIterator iterator() {
+        return new IntIterator() {
+            private final IntIterator delegateIter = delegate.iterator();
+
+            @Override
+            public boolean hasNext() {
+                return delegateIter.hasNext();
+            }
+
+            @Override
+            public int nextInt() {
+                return decodePartition(delegateIter.nextInt());
+            }
+        };
+    }
+
+    public int getMinPartition() {
+        return decodePartition(delegate.getFirst());
+    }
+
+    public int getMaxPartition() {
+        return decodePartition(delegate.getLast());
     }
 
     @Override
@@ -130,17 +205,36 @@ public class Partitions implements Serializable {
         StringBuilder builder = new StringBuilder();
         builder.append('[');
         if (delegate.firstInt() == 0) {
-            builder.append(MINUS_ONE);
+            builder.append(DELTA);
             iter.nextInt(); // this is the zero we just printed
             if (iter.hasNext()) {
                 builder.append(',');
             }
         }
         if (iter.hasNext()) {
-            IntUtil.appendCompact(iter, builder, MINUS_ONE);
+            IntUtil.appendCompact(iter, builder, DELTA);
         }
         builder.append(']');
         return builder.toString();
     }
 
+    public void removeAll(Partitions masterPartitions) {
+        delegate.removeAll(masterPartitions.delegate);
+    }
+
+    public Partitions unmodifiable() {
+        return new Partitions(IntSortedSets.unmodifiable(delegate));
+    }
+
+    public IntStream stream() {
+        return delegate.intStream().map(Partitions::decodePartition);
+    }
+
+    private static int encodePartition(int partition) {
+        return partition - DELTA;
+    }
+
+    private static int decodePartition(int value) {
+        return value + DELTA;
+    }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 42a3c0b212..13bd31b3c0 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -19,12 +19,10 @@
 package org.apache.asterix.replication.logging;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
@@ -32,6 +30,7 @@ import 
org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.transactions.LogType;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -90,9 +89,9 @@ class RemoteLogsNotifier implements Runnable {
             return dls.getDatasetId() == datasetId && dls.getPartition() == 
resourcePartition;
         };
         final Map<Long, LocalResource> resources =
-                localResourceRep.getResources(replicaIndexesPredicate, 
Collections.singleton(resourcePartition));
+                localResourceRep.getResources(replicaIndexesPredicate, 
Partitions.singleton(resourcePartition));
         final List<DatasetResourceReference> replicaIndexesRef =
-                
resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+                
resources.values().stream().map(DatasetResourceReference::of).toList();
         for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
             final IIndexCheckpointManager indexCheckpointManager = 
indexCheckpointManagerProvider.get(replicaIndexRef);
             synchronized (indexCheckpointManager) {
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 68ccd54e50..b0a8c54a47 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.IReplicaManager.PartitionSyncLock;
 import org.apache.asterix.common.transactions.ICheckpointManager;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask;
@@ -48,7 +49,8 @@ public class ReplicaSynchronizer {
 
     public void sync(boolean register, boolean deltaRecovery) throws 
IOException {
         LOGGER.debug("starting replica sync process for replica {}", replica);
-        Object partitionLock = 
appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
+        PartitionSyncLock partitionLock =
+                
appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition());
         synchronized (partitionLock) {
             LOGGER.trace("acquired partition replica lock");
             final ICheckpointManager checkpointManager = 
appCtx.getTransactionSubsystem().getCheckpointManager();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 668decfa3d..cb97929807 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -30,7 +30,6 @@ import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -43,6 +42,7 @@ import 
org.apache.asterix.common.replication.INcLifecycleCoordinator;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import org.apache.asterix.common.utils.NcLocalCounters;
 import org.apache.asterix.common.utils.PartitioningScheme;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.common.utils.StorageConstants;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -152,15 +152,15 @@ public class ClusterStateManager implements 
IClusterStateManager {
 
     @Override
     public synchronized void updateNodeState(String nodeId, boolean active, 
NcLocalCounters localCounters,
-            Set<Integer> activePartitions) {
+            Partitions activePartitions) {
         if (active) {
             updateClusterCounters(nodeId, localCounters);
             participantNodes.add(nodeId);
             if (appCtx.isCloudDeployment()) {
                 // node compute partitions never change
                 ClusterPartition[] nodePartitions = getNodePartitions(nodeId);
-                activePartitions =
-                        
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+                activePartitions = 
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
+                        .collect(Partitions.collector());
                 activateNodePartitions(nodeId, activePartitions);
             } else {
                 activateNodePartitions(nodeId, activePartitions);
@@ -548,8 +548,8 @@ public class ClusterStateManager implements 
IClusterStateManager {
         });
     }
 
-    private synchronized void activateNodePartitions(String nodeId, 
Set<Integer> activePartitions) {
-        for (Integer partitionId : activePartitions) {
+    private synchronized void activateNodePartitions(String nodeId, Partitions 
activePartitions) {
+        for (int partitionId : activePartitions) {
             updateClusterPartition(partitionId, nodeId, true);
         }
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cb4e068aab..d53c258a76 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -56,6 +56,7 @@ import 
org.apache.asterix.common.storage.IIndexCheckpointManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.storage.ResourceStorageStats;
+import org.apache.asterix.common.utils.Partitions;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -288,7 +289,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
     }
 
-    public Map<Long, LocalResource> getResources(Predicate<LocalResource> 
filter, Set<Integer> partitions)
+    public Map<Long, LocalResource> getResources(Predicate<LocalResource> 
filter, Partitions partitions)
             throws HyracksDataException {
         beforeReadAccess();
         try {
@@ -427,12 +428,12 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
     }
 
-    public Set<Integer> getAllPartitions() throws HyracksDataException {
+    public Partitions getAllPartitions() throws HyracksDataException {
         beforeReadAccess();
         try {
             return 
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
                     
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
-                    .collect(Collectors.toSet());
+                    .collect(Partitions.collector());
         } finally {
             afterReadAccess();
         }
@@ -479,7 +480,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     public Map<Long, LocalResource> getPartitionResources(int partition) 
throws HyracksDataException {
         beforeReadAccess();
         try {
-            return getResources(r -> true, Collections.singleton(partition));
+            return getResources(r -> true, Partitions.singleton(partition));
         } finally {
             afterReadAccess();
         }
@@ -712,7 +713,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return null;
     }
 
-    public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, 
Set<Integer> nodePartitions)
+    public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, 
Partitions nodePartitions)
             throws HyracksDataException {
         beforeReadAccess();
         try {


Reply via email to