This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d86e2327d1c961c1bf6c666f6947fdab1560bc72 Author: Michael Blow <[email protected]> AuthorDate: Sun Oct 19 20:43:09 2025 -0400 [NO ISSUE][*DB][STO] Cloud, logging misc improvements Ext-ref: MB-69042,MB-68946 Change-Id: I6b10c08e74e664f1b59ed0df3bf16b871360d66f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20504 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../asterix/api/http/server/StorageApiServlet.java | 11 +- .../app/message/StorageCleanupRequestMessage.java | 4 +- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 4 +- .../org/apache/asterix/app/nc/RecoveryManager.java | 34 +++-- .../org/apache/asterix/app/nc/ReplicaManager.java | 37 +++--- .../nc/task/CloudToLocalStorageCachingTask.java | 14 +- .../asterix/app/nc/task/LocalRecoveryTask.java | 12 +- .../app/nc/task/LocalStorageCleanupTask.java | 5 +- .../asterix/app/nc/task/UpdateNodeStatusTask.java | 9 +- .../app/replication/NcLifecycleCoordinator.java | 13 +- .../message/NCLifecycleTaskReportMessage.java | 11 +- .../message/RegistrationTasksRequestMessage.java | 12 +- .../message/RegistrationTasksResponseMessage.java | 5 +- .../asterix/runtime/ClusterStateManagerTest.java | 7 +- .../asterix/cloud/AbstractCloudIOManager.java | 2 +- .../asterix/cloud/LocalPartitionBootstrapper.java | 4 +- .../cloud/clients/aws/s3/S3ClientConfig.java | 18 +-- .../cloud/clients/aws/s3/S3ParallelDownloader.java | 2 +- .../org/apache/asterix/cloud/s3/LSMS3Test.java | 2 +- .../common/cloud/IPartitionBootstrapper.java | 4 +- .../common/cluster/IClusterStateManager.java | 3 +- .../cluster/StorageComputePartitionsMap.java | 7 +- .../asterix/common/config/CloudProperties.java | 75 ++++++----- .../asterix/common/config/MetadataProperties.java | 4 +- .../asterix/common/config/PropertiesAccessor.java | 7 +- .../asterix/common/storage/IReplicaManager.java | 10 +- .../common/transactions/IRecoveryManager.java | 8 +- .../apache/asterix/common/utils/Partitions.java | 146 +++++++++++++++++---- .../replication/logging/RemoteLogsNotifier.java | 7 +- .../replication/sync/ReplicaSynchronizer.java | 4 +- .../asterix/runtime/utils/ClusterStateManager.java | 12 +- .../PersistentLocalResourceRepository.java | 11 +- 32 files changed, 305 insertions(+), 199 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java index 26fee28e10..b93938120b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java @@ -24,16 +24,15 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.InetSocketAddress; import java.util.List; -import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.function.Predicate; -import java.util.stream.Collectors; +import java.util.function.IntPredicate; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.storage.IReplicaManager; import org.apache.asterix.common.storage.ReplicaIdentifier; import org.apache.asterix.common.storage.ResourceStorageStats; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.http.api.IServletRequest; @@ -121,11 +120,11 @@ public class StorageApiServlet extends AbstractServlet { return getStatus(partition::equals); } - private JsonNode getStatus(Predicate<Integer> predicate) { + private JsonNode getStatus(IntPredicate predicate) { final ArrayNode status = OBJECT_MAPPER.createArrayNode(); final IReplicaManager storageSubsystem = appCtx.getReplicaManager(); - final Set<Integer> partitions = - storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet()); + final Partitions partitions = storageSubsystem.getPartitions().stream().filter(predicate) + .collect(Partitions::new, Partitions::add, Partitions::addAll); for (Integer partition : partitions) { final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode(); partitionJson.put("partition", partition); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java index beaaac00fd..799510cbef 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/StorageCleanupRequestMessage.java @@ -21,7 +21,6 @@ package org.apache.asterix.app.message; import static org.apache.hyracks.util.ExitUtil.EC_NC_FAILED_TO_NOTIFY_TASKS_COMPLETED; import java.util.Map; -import java.util.Set; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; @@ -30,6 +29,7 @@ import org.apache.asterix.common.messaging.CcIdentifiedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.common.IIndex; @@ -57,7 +57,7 @@ public class StorageCleanupRequestMessage extends CcIdentifiedMessage implements INCMessageBroker broker = (INCMessageBroker) appContext.getServiceContext().getMessageBroker(); PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appContext.getLocalResourceRepository(); - Set<Integer> nodePartitions = appContext.getReplicaManager().getPartitions(); + Partitions nodePartitions = appContext.getReplicaManager().getPartitions(); Map<Long, LocalResource> localResources = localResourceRepository.getResources(lr -> true, nodePartitions); for (LocalResource resource : localResources.values()) { DatasetLocalResource lr = (DatasetLocalResource) resource.getResource(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index e0277aaf81..14d16515da 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -26,7 +26,6 @@ import java.rmi.server.UnicastRemoteObject; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; @@ -78,6 +77,7 @@ import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.transactions.IRecoveryManagerFactory; import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.file.StorageComponentProvider; import org.apache.asterix.metadata.MetadataManager; @@ -296,7 +296,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier); localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager); final String nodeId = getServiceContext().getNodeId(); - final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId); + final Partitions nodePartitions = metadataProperties.getNodePartitions(nodeId); replicaManager = new ReplicaManager(this, nodePartitions); isShuttingdown = false; activeManager = new ActiveManager(threadExecutor, getServiceContext().getNodeId(), diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 269f6f7224..1b2c17d595 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -39,7 +39,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.stream.Collectors; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; @@ -61,6 +60,7 @@ import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.transactions.TxnId; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.transaction.management.service.logging.LogManager; @@ -155,7 +155,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } @Override - public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException { + public void startLocalRecovery(Partitions partitions) throws IOException, ACIDException { state = SystemState.RECOVERING; LOGGER.info("starting recovery for partitions {}", partitions); Checkpoint systemCheckpoint = checkpointManager.getLatest(); @@ -175,7 +175,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true); } - public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN, + public synchronized void replayPartitionsLogs(Partitions partitions, ILogReader logReader, long lowWaterMarkLSN, boolean closeOnFlushRedo) throws IOException, ACIDException { try { Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN); @@ -186,7 +186,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } } - private synchronized Set<Long> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader, + private synchronized Set<Long> startRecoverysAnalysisPhase(Partitions partitions, ILogReader logReader, long lowWaterMarkLSN) throws IOException, ACIDException { int updateLogCount = 0; int entityCommitLogCount = 0; @@ -274,8 +274,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { jobEntityWinners.add(logRecord); } - private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader, - long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException { + private synchronized void startRecoveryRedoPhase(Partitions partitions, ILogReader logReader, long lowWaterMarkLSN, + Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException { int redoCount = 0; long txnId = 0; @@ -491,21 +491,20 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private long getRemoteMinFirstLSN() throws HyracksDataException { // find the min first lsn of partitions that are replicated on this node - final Set<Integer> allPartitions = localResourceRepository.getAllPartitions(); - final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions(); + final Partitions allPartitions = localResourceRepository.getAllPartitions(); + final Partitions masterPartitions = appCtx.getReplicaManager().getPartitions(); allPartitions.removeAll(masterPartitions); return getPartitionsMinLSN(allPartitions); } - private long getPartitionsMinLSN(Set<Integer> partitions) throws HyracksDataException { + private long getPartitionsMinLSN(Partitions partitions) throws HyracksDataException { final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = appCtx.getIndexCheckpointManagerProvider(); long minRemoteLSN = Long.MAX_VALUE; for (Integer partition : partitions) { final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> { DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); return dsResource.getPartition() == partition; - }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of) - .collect(Collectors.toList()); + }, Partitions.singleton(partition)).values().stream().map(DatasetResourceReference::of).toList(); for (DatasetResourceReference indexRef : partitionResources) { try { final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef); @@ -524,15 +523,14 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } @Override - public long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException { + public long getPartitionsMaxLSN(Partitions partitions) throws HyracksDataException { final IIndexCheckpointManagerProvider idxCheckpointMgrProvider = appCtx.getIndexCheckpointManagerProvider(); long maxLSN = 0; - for (Integer partition : partitions) { + for (int partition : partitions) { final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> { DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); return dsResource.getPartition() == partition; - }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of) - .collect(Collectors.toList()); + }, Partitions.singleton(partition)).values().stream().map(DatasetResourceReference::of).toList(); for (DatasetResourceReference indexRef : partitionResources) { try { final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef); @@ -549,7 +547,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } @Override - public synchronized void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) + public synchronized void replayReplicaPartitionLogs(Partitions partitions, boolean flush) throws HyracksDataException { //replay logs > minLSN that belong to these partitions try { @@ -597,7 +595,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { FileUtils.deleteQuietly(recoveryFolderPath.toFile()); } - protected void cleanUp(Set<Integer> partitions) throws HyracksDataException { + protected void cleanUp(Partitions partitions) throws HyracksDataException { // the cleanup is currently done by PersistentLocalResourceRepository#clean } @@ -657,7 +655,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { TxnEntityId loserEntity; List<Long> undoLSNSet = null; //get active partitions on this node - Set<Integer> activePartitions = appCtx.getReplicaManager().getPartitions(); + Partitions activePartitions = appCtx.getReplicaManager().getPartitions(); ILogReader logReader = logMgr.getLogReader(false); try { logReader.setPosition(firstLSN); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java index 101b1b0fe2..a1eee08225 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java @@ -19,12 +19,9 @@ package org.apache.asterix.app.nc; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -34,6 +31,7 @@ import org.apache.asterix.common.replication.IPartitionReplica; import org.apache.asterix.common.storage.IReplicaManager; import org.apache.asterix.common.storage.ReplicaIdentifier; import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.client.NodeStatus; @@ -44,6 +42,9 @@ import org.apache.hyracks.util.annotations.ThreadSafe; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; + @ThreadSafe public class ReplicaManager implements IReplicaManager { private static final Logger LOGGER = LogManager.getLogger(); @@ -52,17 +53,17 @@ public class ReplicaManager implements IReplicaManager { /** * the partitions to which the current node is master */ - private final Map<Integer, Object> partitions = new HashMap<>(); + private final Int2ObjectMap<PartitionSyncLock> partitions = new Int2ObjectOpenHashMap<>(); /** * current replicas */ private final Map<ReplicaIdentifier, PartitionReplica> replicas = new HashMap<>(); - private final Set<Integer> nodeOriginatedPartitions = new HashSet<>(); + private final Partitions nodeOriginatedPartitions = new Partitions(); - public ReplicaManager(INcApplicationContext appCtx, Set<Integer> partitions) { + public ReplicaManager(INcApplicationContext appCtx, Partitions partitions) { this.appCtx = appCtx; - for (Integer partition : partitions) { - this.partitions.put(partition, new Object()); + for (int partition : partitions) { + this.partitions.put(partition, new PartitionSyncLock()); } setNodeOriginatedPartitions(appCtx); } @@ -109,15 +110,15 @@ public class ReplicaManager implements IReplicaManager { } @Override - public synchronized Set<Integer> getPartitions() { - return Collections.unmodifiableSet(partitions.keySet()); + public synchronized Partitions getPartitions() { + return new Partitions(partitions.keySet()).unmodifiable(); } @Override - public synchronized void setActivePartitions(Set<Integer> activePartitions) { + public synchronized void setActivePartitions(Partitions activePartitions) { partitions.clear(); - for (Integer partition : activePartitions) { - partitions.put(partition, new Object()); + for (int partition : activePartitions) { + partitions.put(partition, new PartitionSyncLock()); } } @@ -132,9 +133,9 @@ public class ReplicaManager implements IReplicaManager { if (!appCtx.isCloudDeployment()) { localResourceRepository.cleanup(partition); final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager(); - recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true); + recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Partitions.collector()), true); } - partitions.put(partition, new Object()); + partitions.put(partition, new PartitionSyncLock()); } @Override @@ -151,8 +152,8 @@ public class ReplicaManager implements IReplicaManager { } @Override - public synchronized Object getPartitionSyncLock(int partition) { - Object syncLock = partitions.get(partition); + public synchronized PartitionSyncLock getPartitionSyncLock(int partition) { + PartitionSyncLock syncLock = partitions.get(partition); if (syncLock == null) { throw new IllegalStateException("partition " + partition + " is not active on this node"); } @@ -187,7 +188,7 @@ public class ReplicaManager implements IReplicaManager { } private void setNodeOriginatedPartitions(INcApplicationContext appCtx) { - Set<Integer> nodePartitions = + Partitions nodePartitions = appCtx.getMetadataProperties().getNodePartitions(appCtx.getServiceContext().getNodeId()); nodeOriginatedPartitions.addAll(nodePartitions); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java index cc594884ba..00205a64c6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/CloudToLocalStorageCachingTask.java @@ -18,13 +18,11 @@ */ package org.apache.asterix.app.nc.task; -import java.util.Arrays; -import java.util.Set; - import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.cloud.IPartitionBootstrapper; import org.apache.asterix.common.transactions.Checkpoint; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -37,13 +35,12 @@ public class CloudToLocalStorageCachingTask implements INCLifecycleTask { private static final Logger LOGGER = LogManager.getLogger(); - private static final long serialVersionUID = 1L; - private final Set<Integer> storagePartitions; + private static final long serialVersionUID = 2L; + private final Partitions storagePartitions; private final boolean metadataNode; private final int metadataPartitionId; - public CloudToLocalStorageCachingTask(Set<Integer> storagePartitions, boolean metadataNode, - int metadataPartitionId) { + public CloudToLocalStorageCachingTask(Partitions storagePartitions, boolean metadataNode, int metadataPartitionId) { this.storagePartitions = storagePartitions; this.metadataNode = metadataNode; this.metadataPartitionId = metadataPartitionId; @@ -78,7 +75,6 @@ public class CloudToLocalStorageCachingTask implements INCLifecycleTask { @Override public String toString() { - return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"partitions\" : " - + Arrays.toString(storagePartitions.toArray()) + " }"; + return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"partitions\" : " + storagePartitions + " }"; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java index b8addb29bc..e906de77f0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalRecoveryTask.java @@ -19,22 +19,21 @@ package org.apache.asterix.app.nc.task; import java.io.IOException; -import java.util.Arrays; -import java.util.Set; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; public class LocalRecoveryTask implements INCLifecycleTask { - private static final long serialVersionUID = 1L; - private final Set<Integer> partitions; + private static final long serialVersionUID = 2L; + private final Partitions partitions; - public LocalRecoveryTask(Set<Integer> partitions) { + public LocalRecoveryTask(Partitions partitions) { this.partitions = partitions; } @@ -52,7 +51,6 @@ public class LocalRecoveryTask implements INCLifecycleTask { @Override public String toString() { - return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"partitions\" : " - + Arrays.toString(partitions.toArray()) + " }"; + return "{ \"class\" : \"" + getClass().getSimpleName() + "\", \"partitions\" : " + partitions + " }"; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java index 61a07d15ab..673e401cf5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java @@ -18,12 +18,11 @@ */ package org.apache.asterix.app.nc.task; -import java.util.Set; - import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -45,7 +44,7 @@ public class LocalStorageCleanupTask implements INCLifecycleTask { (PersistentLocalResourceRepository) appContext.getLocalResourceRepository(); localResourceRepository.deleteCorruptedResources(); deleteInvalidMetadataIndexes(localResourceRepository); - final Set<Integer> nodePartitions = appContext.getReplicaManager().getPartitions(); + final Partitions nodePartitions = appContext.getReplicaManager().getPartitions(); INcApplicationContext appCtx = (INcApplicationContext) cs.getApplicationContext(); if (appCtx.isCloudDeployment() && nodePartitions.contains(metadataPartitionId)) { appCtx.getTransactionSubsystem().getTransactionManager().rollbackMetadataTransactionsWithoutWAL(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java index 17eff4ad81..b4b2d0987a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/UpdateNodeStatusTask.java @@ -18,11 +18,10 @@ */ package org.apache.asterix.app.nc.task; -import java.util.Set; - import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.storage.IReplicaManager; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -34,11 +33,11 @@ import org.apache.logging.log4j.Logger; public class UpdateNodeStatusTask implements INCLifecycleTask { private static final Logger LOGGER = LogManager.getLogger(); - private static final long serialVersionUID = 2L; + private static final long serialVersionUID = 3L; private final NodeStatus status; - private final Set<Integer> activePartitions; + private final Partitions activePartitions; - public UpdateNodeStatusTask(NodeStatus status, Set<Integer> activePartitions) { + public UpdateNodeStatusTask(NodeStatus status, Partitions activePartitions) { this.status = status; this.activePartitions = activePartitions; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index fff1bc118c..9ccb66a0b7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -60,6 +60,7 @@ import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.replication.messaging.ReplicaFailedMessage; import org.apache.http.client.utils.URIBuilder; @@ -172,7 +173,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } protected List<INCLifecycleTask> buildNCRegTasks(String nodeId, NodeStatus nodeStatus, SystemState state, - Set<Integer> activePartitions) { + Partitions activePartitions) { LOGGER.info( "Building registration tasks for node {} with status {} and system state: {} and active partitions {}", nodeId, nodeStatus, state, activePartitions); @@ -217,9 +218,9 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { } protected List<INCLifecycleTask> buildIdleNcRegTasks(String newNodeId, boolean metadataNode, SystemState state, - Set<Integer> activePartitions) { + Partitions activePartitions) { final List<INCLifecycleTask> tasks = new ArrayList<>(); - Set<Integer> nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode, state); + Partitions nodeActivePartitions = getNodeActivePartitions(newNodeId, activePartitions, metadataNode, state); tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING, nodeActivePartitions)); int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId(); // Add any cloud-related tasks @@ -256,7 +257,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { return tasks; } - protected void addCloudTasks(List<INCLifecycleTask> tasks, Set<Integer> computePartitions, boolean metadataNode, + protected void addCloudTasks(List<INCLifecycleTask> tasks, Partitions computePartitions, boolean metadataNode, int metadataPartitionId) { IApplicationContext appCtx = (IApplicationContext) serviceContext.getApplicationContext(); if (!appCtx.isCloudDeployment()) { @@ -265,7 +266,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { StorageComputePartitionsMap map = clusterManager.getStorageComputeMap(); map = map == null ? StorageComputePartitionsMap.computePartitionsMap(clusterManager) : map; - Set<Integer> storagePartitions = map.getStoragePartitions(computePartitions); + Partitions storagePartitions = map.getStoragePartitions(computePartitions); tasks.add(new CloudToLocalStorageCachingTask(storagePartitions, metadataNode, metadataPartitionId)); } @@ -322,7 +323,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { return true; } - protected Set<Integer> getNodeActivePartitions(String nodeId, Set<Integer> nodePartitions, boolean metadataNode, + protected Partitions getNodeActivePartitions(String nodeId, Partitions nodePartitions, boolean metadataNode, SystemState state) { if (metadataNode) { nodePartitions.add(clusterManager.getMetadataPartition().getPartitionId()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java index b2a2dd771a..082f5b5462 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/NCLifecycleTaskReportMessage.java @@ -18,25 +18,24 @@ */ package org.apache.asterix.app.replication.message; -import java.util.Set; - import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.utils.NcLocalCounters; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.exceptions.HyracksDataException; public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAddressedMessage { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final String nodeId; private final boolean success; private Throwable exception; private final NcLocalCounters localCounters; - private final Set<Integer> activePartitions; + private final Partitions activePartitions; public NCLifecycleTaskReportMessage(String nodeId, boolean success, NcLocalCounters localCounters, - Set<Integer> activePartitions) { + Partitions activePartitions) { this.nodeId = nodeId; this.success = success; this.localCounters = localCounters; @@ -73,7 +72,7 @@ public class NCLifecycleTaskReportMessage implements INCLifecycleMessage, ICcAdd return MessageType.REGISTRATION_TASKS_RESULT; } - public Set<Integer> getActivePartitions() { + public Partitions getActivePartitions() { return activePartitions; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java index 9e95ac644d..976308c81d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksRequestMessage.java @@ -19,13 +19,13 @@ package org.apache.asterix.app.replication.message; import java.util.Map; -import java.util.Set; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.messaging.api.ICcAddressedMessage; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -37,15 +37,15 @@ import org.apache.logging.log4j.Logger; public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICcAddressedMessage { private static final Logger LOGGER = LogManager.getLogger(); - private static final long serialVersionUID = 2L; + private static final long serialVersionUID = 3L; private final SystemState state; private final String nodeId; private final NodeStatus nodeStatus; private final Map<String, Object> secrets; - private final Set<Integer> activePartitions; + private final Partitions activePartitions; public RegistrationTasksRequestMessage(String nodeId, NodeStatus nodeStatus, SystemState state, - Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) { + Map<String, Object> secretsEphemeral, Partitions activePartitions) { this.state = state; this.nodeId = nodeId; this.nodeStatus = nodeStatus; @@ -54,7 +54,7 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc } public static void send(CcId ccId, NodeControllerService cs, NodeStatus nodeStatus, SystemState systemState, - Map<String, Object> secretsEphemeral, Set<Integer> activePartitions) throws HyracksDataException { + Map<String, Object> secretsEphemeral, Partitions activePartitions) throws HyracksDataException { try { RegistrationTasksRequestMessage msg = new RegistrationTasksRequestMessage(cs.getId(), nodeStatus, systemState, secretsEphemeral, activePartitions); @@ -92,7 +92,7 @@ public class RegistrationTasksRequestMessage implements INCLifecycleMessage, ICc return secrets; } - public Set<Integer> getActivePartitions() { + public Partitions getActivePartitions() { return activePartitions; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java index 9d378f5a41..285563e645 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -18,9 +18,7 @@ */ package org.apache.asterix.app.replication.message; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.asterix.common.api.INCLifecycleTask; import org.apache.asterix.common.api.INcApplicationContext; @@ -29,6 +27,7 @@ import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.utils.NcLocalCounters; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.control.nc.NodeControllerService; @@ -74,7 +73,7 @@ public class RegistrationTasksResponseMessage extends CcIdentifiedMessage LOGGER.debug("returning local counters to cc: {}", localCounter); } // wrap the returned partitions in a hash set to make it serializable - Set<Integer> nodeActivePartitions = new HashSet<>(appCtx.getReplicaManager().getPartitions()); + Partitions nodeActivePartitions = new Partitions(appCtx.getReplicaManager().getPartitions()); NCLifecycleTaskReportMessage result = new NCLifecycleTaskReportMessage(nodeId, success, localCounter, nodeActivePartitions); result.setException(exception); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java index 53b9294e2b..6b99253791 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java @@ -19,9 +19,7 @@ package org.apache.asterix.runtime; import java.util.Collections; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -37,6 +35,7 @@ import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.asterix.common.utils.PartitioningScheme; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.runtime.transaction.ResourceIdManager; import org.apache.asterix.runtime.utils.BulkTxnIdFactory; @@ -277,8 +276,8 @@ public class ClusterStateManagerTest { return localCounters; } - private static Set<Integer> getNodeActivePartitions(String nodeId) { - Set<Integer> activePartitions = new HashSet<>(); + private static Partitions getNodeActivePartitions(String nodeId) { + Partitions activePartitions = new Partitions(); switch (nodeId) { case NC1: activePartitions.add(0); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 7712f08653..6fc271e382 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -126,7 +126,7 @@ public abstract class AbstractCloudIOManager extends IOManager implements IParti } @Override - public final void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions, + public final void bootstrap(Partitions activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode, int metadataPartition, boolean ensureCompleteBootstrap) throws HyracksDataException { partitions.clear(); partitions.addAll(activePartitions); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java index f2451ee04f..d5d0d87f3d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LocalPartitionBootstrapper.java @@ -20,10 +20,10 @@ package org.apache.asterix.cloud; import java.util.List; -import java.util.Set; import org.apache.asterix.common.cloud.IPartitionBootstrapper; import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -48,7 +48,7 @@ public class LocalPartitionBootstrapper implements IPartitionBootstrapper { } @Override - public void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions, + public void bootstrap(Partitions activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode, int metadataPartition, boolean ensureCompleteBootstrap) throws HyracksDataException { for (FileReference onDiskPartition : currentOnDiskPartitions) { int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath()); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java index f05a0e5820..b6683ad20c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java @@ -47,18 +47,19 @@ public final class S3ClientConfig { private final boolean forcePathStyle; private final boolean disableSslVerify; private final boolean storageListEventuallyConsistent; + private final boolean enableCrtClient; public S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, - long profilerLogInterval, int writeBufferSize) { + long profilerLogInterval, int writeBufferSize, boolean enableCrtClient) { this(region, endpoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, 1, 0, 0, 0, false, false, - false, 0, 0); + false, 0, 0, enableCrtClient); } private S3ClientConfig(String region, String endpoint, String prefix, boolean anonymousAuth, long profilerLogInterval, int writeBufferSize, long tokenAcquireTimeout, int writeMaxRequestsPerSeconds, int readMaxRequestsPerSeconds, int requestsMaxHttpConnections, boolean forcePathStyle, boolean disableSslVerify, boolean storageListEventuallyConsistent, int requestsMaxPendingHttpConnections, - int requestsHttpConnectionAcquireTimeout) { + int requestsHttpConnectionAcquireTimeout, boolean enableCrtClient) { this.region = Objects.requireNonNull(region, "region"); this.endpoint = endpoint; this.prefix = Objects.requireNonNull(prefix, "prefix"); @@ -74,6 +75,7 @@ public final class S3ClientConfig { this.forcePathStyle = forcePathStyle; this.disableSslVerify = disableSslVerify; this.storageListEventuallyConsistent = storageListEventuallyConsistent; + this.enableCrtClient = enableCrtClient; } public static S3ClientConfig of(CloudProperties cloudProperties) { @@ -85,7 +87,7 @@ public final class S3ClientConfig { cloudProperties.isStorageForcePathStyle(), cloudProperties.isStorageDisableSSLVerify(), cloudProperties.isStorageListEventuallyConsistent(), cloudProperties.getRequestsMaxPendingHttpConnections(), - cloudProperties.getRequestsHttpConnectionAcquireTimeout()); + cloudProperties.getRequestsHttpConnectionAcquireTimeout(), cloudProperties.isS3EnableCrtClient()); } public static S3ClientConfig of(Map<String, String> configuration, int writeBufferSize) { @@ -99,7 +101,7 @@ public final class S3ClientConfig { String prefix = ""; boolean anonymousAuth = false; - return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize); + return new S3ClientConfig(region, endPoint, prefix, anonymousAuth, profilerLogInterval, writeBufferSize, false); } public String getRegion() { @@ -116,7 +118,7 @@ public final class S3ClientConfig { public boolean isLocalS3Provider() { // to workaround https://github.com/findify/s3mock/issues/187 in our S3Mock, we encode/decode keys - return isS3Mock(); + return false; //isS3Mock(); } public AwsCredentialsProvider createCredentialsProvider() { @@ -167,8 +169,8 @@ public final class S3ClientConfig { return storageListEventuallyConsistent; } - private boolean isS3Mock() { - return endpoint != null && !endpoint.isEmpty(); + public boolean isCrtClientEnabled() { + return enableCrtClient; } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java index 2acc12e7ba..50d05c66d5 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ParallelDownloader.java @@ -181,7 +181,7 @@ class S3ParallelDownloader implements IParallelDownloader { private static S3AsyncClient createAsyncClient(S3ClientConfig config) { // CRT client is not supported by all local S3 providers, but provides a better performance with AWS S3 - if (!config.isLocalS3Provider()) { + if (config.isCrtClientEnabled()) { return createS3CrtAsyncClient(config); } return createS3AsyncClient(config); diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java index fad8081d99..1bb0f746e4 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/s3/LSMS3Test.java @@ -68,7 +68,7 @@ public class LSMS3Test extends AbstractLSMTest { LOGGER.info("Client created successfully"); int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); S3ClientConfig config = - new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0, writeBufferSize); + new S3ClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, "", true, 0, writeBufferSize, false); CLOUD_CLIENT = new S3CloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java index 88e88abac4..3cca8f5d25 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cloud/IPartitionBootstrapper.java @@ -19,9 +19,9 @@ package org.apache.asterix.common.cloud; import java.util.List; -import java.util.Set; import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; @@ -52,6 +52,6 @@ public interface IPartitionBootstrapper { * @param metadataPartition metadata partition number * @param ensureCompleteBootstrap ensures the metadata catalog was fully bootstrapped */ - void bootstrap(Set<Integer> activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode, + void bootstrap(Partitions activePartitions, List<FileReference> currentOnDiskPartitions, boolean metadataNode, int metadataPartition, boolean ensureCompleteBootstrap) throws HyracksDataException; } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 6c0982a38e..ffc554c33e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -27,6 +27,7 @@ import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.utils.NcLocalCounters; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.config.IOption; @@ -65,7 +66,7 @@ public interface IClusterStateManager { * @param activePartitions * @throws HyracksDataException */ - void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Set<Integer> activePartitions) + void updateNodeState(String nodeId, boolean active, NcLocalCounters ncLocalCounters, Partitions activePartitions) throws HyracksDataException; /** diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java index dc62cac667..ef440eb678 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java @@ -21,13 +21,14 @@ package org.apache.asterix.common.cluster; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.asterix.common.utils.Partitions; + public class StorageComputePartitionsMap { private final Map<Integer, ComputePartition> storageComputeMap = new HashMap<>(); @@ -83,8 +84,8 @@ public class StorageComputePartitionsMap { * @param computePartitions the current active compute partitions * @return computePartitions's corresponding storage partitions */ - public Set<Integer> getStoragePartitions(Set<Integer> computePartitions) { - Set<Integer> storagePartitions = new HashSet<>(); + public Partitions getStoragePartitions(Partitions computePartitions) { + Partitions storagePartitions = new Partitions(); for (Map.Entry<Integer, ComputePartition> entry : storageComputeMap.entrySet()) { ComputePartition computePartition = entry.getValue(); if (computePartitions.contains(computePartition.getId())) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java index f556fcbb94..392282f6ec 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/CloudProperties.java @@ -28,8 +28,10 @@ import static org.apache.hyracks.control.common.config.OptionTypes.getRangedInte import static org.apache.hyracks.util.StorageUtil.StorageUnit.GIGABYTE; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.asterix.common.cloud.CloudCachePolicy; +import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; import org.apache.hyracks.api.config.Section; @@ -70,7 +72,11 @@ public class CloudProperties extends AbstractProperties { CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT(POSITIVE_INTEGER, 120), CLOUD_STORAGE_FORCE_PATH_STYLE(BOOLEAN, false), CLOUD_STORAGE_DISABLE_SSL_VERIFY(BOOLEAN, false), - CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false); + CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT(BOOLEAN, false), + CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT(BOOLEAN, (Function<IApplicationConfig, Boolean>) app -> { + String endpoint = app.getString(CLOUD_STORAGE_ENDPOINT); + return endpoint == null || endpoint.isEmpty(); + }); private final IOptionType interpreter; private final Object defaultValue; @@ -80,6 +86,11 @@ public class CloudProperties extends AbstractProperties { this.defaultValue = defaultValue; } + <T> Option(IOptionType<T> interpreter, Function<IApplicationConfig, T> defaultOption) { + this.interpreter = interpreter; + this.defaultValue = defaultOption; + } + @Override public Section section() { switch (this) { @@ -134,62 +145,55 @@ public class CloudProperties extends AbstractProperties { + "all partitions upon booting, whereas 'lazy' caching will download a file upon" + " request to open it. 'selective' caching will act as the 'lazy' policy; however, " + " it allows to use the local disk(s) as a cache, where pages and indexes can be " - + " cached or evicted according to the pressure imposed on the local disks." - + " (default: 'selective')"; + + " cached or evicted according to the pressure imposed on the local disks"; case CLOUD_STORAGE_ALLOCATION_PERCENTAGE: return "The percentage of the total disk space that should be allocated for data storage when the" + " 'selective' caching policy is used. The remaining will act as a buffer for " - + " query workspace (i.e., for query operations that require spilling to disk)." - + " (default: 80% of the total disk space)"; + + " query workspace (i.e., for query operations that require spilling to disk)"; case CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE: return "The percentage of the used storage space at which the disk sweeper starts freeing space by" + " punching holes in stored indexes or by evicting them entirely, " - + " when the 'selective' caching policy is used." - + " (default: 90% of the allocated space for storage)"; + + " when the 'selective' caching policy is used"; case CLOUD_STORAGE_DISK_MONITOR_INTERVAL: return "The disk monitoring interval time (in seconds): determines how often the system" - + " checks for pressure on disk space when using the 'selective' caching policy." - + " (default : 120 seconds)"; + + " checks for pressure on disk space when using the 'selective' caching policy"; case CLOUD_STORAGE_INDEX_INACTIVE_DURATION_THRESHOLD: - return "The duration in minutes to consider an index is inactive. (default: 360 or 6 hours)"; + return "The duration in minutes to consider an index is inactive"; case CLOUD_STORAGE_DEBUG_MODE_ENABLED: - return "Whether or not the debug mode is enabled when using the 'selective' caching policy." - + "(default: false)"; + return "Whether or not the debug mode is enabled when using the 'selective' caching policy"; case CLOUD_STORAGE_DEBUG_SWEEP_THRESHOLD_SIZE: return "For debugging only. Pressure size will be the current used space + the additional bytes" + " provided by this configuration option instead of using " - + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE." - + " (default: 0. I.e., CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE will be used by default)"; + + " CLOUD_STORAGE_SWEEP_THRESHOLD_PERCENTAGE"; case CLOUD_PROFILER_LOG_INTERVAL: - return "The waiting time (in minutes) to log cloud request statistics. The minimum is 1 minute." - + " Note: by default, the logging is disabled. Enabling it could perturb the performance of cloud requests"; + return "The waiting time (in minutes) to log cloud request statistics. The minimum is 1 minute" + + " Note: enabling this logging may perturb the performance of cloud requests"; case CLOUD_ACQUIRE_TOKEN_TIMEOUT: return "The waiting time (in milliseconds) if a requesting thread failed to acquire a token if the" - + " rate limit of cloud requests exceeded (default: 100, min: 1, and max: 5000)"; + + " rate limit of cloud requests exceeded (min: 1, max: 5000)"; case CLOUD_MAX_WRITE_REQUESTS_PER_SECOND: - return "The maximum number of write requests per second (default: 250, 0 means unlimited)"; + return "The maximum number of write requests per second (0 means unlimited)"; case CLOUD_MAX_READ_REQUESTS_PER_SECOND: - return "The maximum number of read requests per second (default: 1500, 0 means unlimited)"; + return "The maximum number of read requests per second (0 means unlimited)"; case CLOUD_WRITE_BUFFER_SIZE: - return "The write buffer size in bytes. (default: 8MB, min: 5MB)"; + return "The write buffer size in bytes. (min: 5MiB)"; case CLOUD_EVICTION_PLAN_REEVALUATE_THRESHOLD: - return "The number of cloud reads for re-evaluating an eviction plan. (default: 50)"; + return "The number of cloud reads for re-evaluating an eviction plan"; case CLOUD_REQUESTS_MAX_HTTP_CONNECTIONS: - return "The maximum number of HTTP connections to use concurrently for cloud requests per node. (default: 1000)"; + return "The maximum number of HTTP connections to use concurrently for cloud requests per node"; case CLOUD_REQUESTS_MAX_PENDING_HTTP_CONNECTIONS: - return "The maximum number of HTTP connections allowed to wait for a connection per node. (default: 10000)"; + return "The maximum number of HTTP connections allowed to wait for a connection per node"; case CLOUD_REQUESTS_HTTP_CONNECTION_ACQUIRE_TIMEOUT: - return "The waiting time (in seconds) to acquire an HTTP connection before failing the request." - + " (default: 120 seconds)"; + return "The waiting time (in seconds) to acquire an HTTP connection before failing the request"; case CLOUD_STORAGE_FORCE_PATH_STYLE: - return "Indicates whether or not to force path style when accessing the cloud storage. (default:" - + " false)"; + return "Indicates whether or not to force path style when accessing the cloud storage"; case CLOUD_STORAGE_DISABLE_SSL_VERIFY: - return "Indicates whether or not to disable SSL certificate verification on the cloud storage. " - + "(default: false)"; + return "Indicates whether or not to disable SSL certificate verification on the cloud storage"; case CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT: return "Indicates whether or not deleted objects may be contained in list operations for some time" - + "after they are deleted. (default: false)"; + + "after they are deleted"; + case CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT: + return "Indicates whether or not to use the AWS CRT S3 client for async requests"; default: throw new IllegalStateException("NYI: " + this); } @@ -205,6 +209,13 @@ public class CloudProperties extends AbstractProperties { return defaultValue; } + @Override + public String usageDefaultOverride(IApplicationConfig accessor, Function<IOption, String> optionPrinter) { + if (this == CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT) { + return "true when no custom endpoint is set, otherwise false"; + } + return IOption.super.usageDefaultOverride(accessor, optionPrinter); + } } public String getStorageScheme() { @@ -309,4 +320,8 @@ public class CloudProperties extends AbstractProperties { public boolean isStorageListEventuallyConsistent() { return accessor.getBoolean(Option.CLOUD_STORAGE_LIST_EVENTUALLY_CONSISTENT); } + + public boolean isS3EnableCrtClient() { + return accessor.getBoolean(Option.CLOUD_STORAGE_S3_ENABLE_CRT_CLIENT); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java index 8d18bfd8e9..568802c059 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/MetadataProperties.java @@ -24,10 +24,10 @@ import static org.apache.hyracks.control.common.config.OptionTypes.STRING; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.SortedMap; import org.apache.asterix.common.cluster.ClusterPartition; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.config.IApplicationConfig; import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.api.config.IOptionType; @@ -124,7 +124,7 @@ public class MetadataProperties extends AbstractProperties { return accessor.getClusterPartitions(); } - public Set<Integer> getNodePartitions(String nodeId) { + public Partitions getNodePartitions(String nodeId) { return accessor.getNodePartitions(nodeId); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java index 80f9a17f99..84e2ff00fe 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/PropertiesAccessor.java @@ -36,11 +36,11 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.common.utils.PrintUtil; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.hyracks.algebricks.common.utils.Pair; @@ -196,9 +196,10 @@ public class PropertiesAccessor implements IApplicationConfig { return clusterPartitions; } - public Set<Integer> getNodePartitions(String nodeId) { + public Partitions getNodePartitions(String nodeId) { ClusterPartition[] nodeClusterPartitions = nodePartitionsMap.get(nodeId); - return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet()); + return Arrays.stream(nodeClusterPartitions).map(ClusterPartition::getPartitionId) + .collect(Partitions.collector()); } public List<AsterixExtension> getExtensions() { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java index 14fc9d8e0c..daedf8774e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java @@ -19,9 +19,9 @@ package org.apache.asterix.common.storage; import java.util.List; -import java.util.Set; import org.apache.asterix.common.replication.IPartitionReplica; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IReplicaManager { @@ -54,14 +54,14 @@ public interface IReplicaManager { * * @return The list of partition */ - Set<Integer> getPartitions(); + Partitions getPartitions(); /** * Sets the node active partitions * * @param activePartitions */ - void setActivePartitions(Set<Integer> activePartitions); + void setActivePartitions(Partitions activePartitions); /** * Promotes a partition by making this node its master replica @@ -87,7 +87,7 @@ public interface IReplicaManager { * * @return the synchronization lock */ - Object getPartitionSyncLock(int partition); + PartitionSyncLock getPartitionSyncLock(int partition); /** * Gets the partition replicas matching {@code id} @@ -110,4 +110,6 @@ public interface IReplicaManager { * @return true if the partition is originated by this node, otherwise false. */ boolean isPartitionOrigin(int partition); + + record PartitionSyncLock() {} } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index 3437d4202d..cf96816ee9 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -21,9 +21,9 @@ package org.apache.asterix.common.transactions; import java.io.File; import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.utils.Partitions; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; @@ -109,7 +109,7 @@ public interface IRecoveryManager { * @throws IOException * @throws ACIDException */ - void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException; + void startLocalRecovery(Partitions partitions) throws IOException, ACIDException; /** * Gets the LSN used in {@code partitions} or 0 if no LSNs are found @@ -118,7 +118,7 @@ public interface IRecoveryManager { * @return the maximum used LSN * @throws HyracksDataException */ - long getPartitionsMaxLSN(Set<Integer> partitions) throws HyracksDataException; + long getPartitionsMaxLSN(Partitions partitions) throws HyracksDataException; /** * Replay the commited transactions' logs belonging to {@code partitions}. if {@code flush} is true, @@ -128,7 +128,7 @@ public interface IRecoveryManager { * @param flush * @throws HyracksDataException */ - void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException; + void replayReplicaPartitionLogs(Partitions partitions, boolean flush) throws HyracksDataException; /** * Ensures that {@code datasetPartitionIndexes} are consistent by performing component id level recovery diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java index e939ce13b7..1405661a7b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Partitions.java @@ -21,38 +21,58 @@ package org.apache.asterix.common.utils; import java.io.Serial; import java.io.Serializable; import java.util.Set; +import java.util.stream.Collector; +import java.util.stream.IntStream; import org.apache.commons.lang3.mutable.MutableBoolean; +import it.unimi.dsi.fastutil.ints.IntIterable; import it.unimi.dsi.fastutil.ints.IntIterator; +import it.unimi.dsi.fastutil.ints.IntSet; import it.unimi.dsi.fastutil.ints.IntSortedSet; import it.unimi.dsi.fastutil.ints.IntSortedSets; /** - * A specialized bit set for storing partition IDs. - * Partition IDs are in the range [-1, 32766] (inclusive). - * The value -1 is used to represent the "unpartitioned" partition. - * The value 32767 is reserved for the internal shift and cannot be used as a partition ID. - * This class internally shifts the partition IDs by +1 to fit into an unsigned short range [0, 32767]. + * A specialized data structure for efficiently storing partitions. + * Partitions are in the range [-1, 2_147_483_646] (inclusive). + * The value -1 is used to represent the "metadata" partition. + * The value 2_147_483_647 is reserved for the internal shift and cannot be used as a partition. + * This class internally shifts the partitions by +1 to fit into the signed int range [0, 2_147_483_647]. */ -public class Partitions implements Serializable { +public class Partitions implements IntIterable, Serializable { @Serial private static final long serialVersionUID = 1L; - private static final int MAX_PARTITION_ID = Integer.MAX_VALUE - 1; - private static final int MIN_PARTITION_ID = -1; + private static final int DELTA = -1; + private static final int MAX_PARTITION = Integer.MAX_VALUE + DELTA; + private static final int MIN_PARTITION = DELTA; + private static final int DEFAULT_MAX_PARTITIONS = 128; private static final Partitions EMPTY = new Partitions(IntSortedSets.EMPTY_SET); - private static final short MINUS_ONE = (short) -1; private final IntSortedSet delegate; public Partitions() { - this(new IntSortedBitSet()); + this(DEFAULT_MAX_PARTITIONS); } - public Partitions(int initialMaxValue) { - this(new IntSortedBitSet(initialMaxValue + 1)); + public Partitions(int initialMaxPartition) { + this(new IntSortedBitSet(encodePartition(initialMaxPartition))); } - public Partitions(IntSortedSet delegate) { + public Partitions(Set<Integer> initialValues) { + this(); + initialValues.forEach(this::add); + } + + public Partitions(IntSet initialValues) { + this(); + initialValues.forEach(this::add); + } + + public Partitions(Partitions initialValues) { + this(); + addAll(initialValues); + } + + private Partitions(IntSortedSet delegate) { this.delegate = delegate; } @@ -63,17 +83,43 @@ public class Partitions implements Serializable { return EMPTY; } + public static Partitions singleton(int partition) { + return new Partitions(IntSortedSets.singleton(encodePartition(partition))); + } + + public static Collector<? super Integer, Partitions, Partitions> collector() { + return Collector.of(Partitions::new, Partitions::add, (l, r) -> { + l.addAll(r); + return l; + }, Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH); + } + /** - * Adds a partition ID to the set. Partition ID must be in the range [-1, 32766]. - * @param k the partition ID to add - * @return true if the partition ID was added, false if it was already present - * @throws IllegalArgumentException if the partition ID is out of range + * Adds a partition to the set. Partition must be in the range [-1, 2_147_483_646]. + * @param partition the partition to add + * @return true if the partition was added, false if it was already present + * @throws IllegalArgumentException if the partition is out of range */ - public boolean add(int k) { - if (k > MAX_PARTITION_ID || k < MIN_PARTITION_ID) { - throw new IllegalArgumentException("Partition number " + k + " out of range"); + public boolean add(int partition) { + checkRange(partition); + return delegate.add(encodePartition(partition)); + } + + /** + * Removes a partition from the set. Partition must be in the range [-1, 2_147_483_646]. + * @param partition the partition to remove + * @return true if the partition was added, false if it was already present + * @throws IllegalArgumentException if the partition is out of range + */ + public boolean remove(int partition) { + checkRange(partition); + return delegate.remove(encodePartition(partition)); + } + + private static void checkRange(int partition) { + if (partition > MAX_PARTITION || partition < MIN_PARTITION) { + throw new IllegalArgumentException("Partition number " + partition + " out of range"); } - return delegate.add((short) (k + 1)); } public boolean isEmpty() { @@ -94,15 +140,44 @@ public class Partitions implements Serializable { return retval.booleanValue(); } + public boolean addAll(Partitions activePartitions) { + return delegate.addAll(activePartitions.delegate); + } + public int size() { return delegate.size(); } - public boolean contains(int partitionNum) { - if (partitionNum > MAX_PARTITION_ID || partitionNum < MIN_PARTITION_ID) { + public boolean contains(int partition) { + if (partition > MAX_PARTITION || partition < MIN_PARTITION) { return false; } - return delegate.contains((short) (partitionNum + 1)); + return delegate.contains(encodePartition(partition)); + } + + @Override + public IntIterator iterator() { + return new IntIterator() { + private final IntIterator delegateIter = delegate.iterator(); + + @Override + public boolean hasNext() { + return delegateIter.hasNext(); + } + + @Override + public int nextInt() { + return decodePartition(delegateIter.nextInt()); + } + }; + } + + public int getMinPartition() { + return decodePartition(delegate.getFirst()); + } + + public int getMaxPartition() { + return decodePartition(delegate.getLast()); } @Override @@ -130,17 +205,36 @@ public class Partitions implements Serializable { StringBuilder builder = new StringBuilder(); builder.append('['); if (delegate.firstInt() == 0) { - builder.append(MINUS_ONE); + builder.append(DELTA); iter.nextInt(); // this is the zero we just printed if (iter.hasNext()) { builder.append(','); } } if (iter.hasNext()) { - IntUtil.appendCompact(iter, builder, MINUS_ONE); + IntUtil.appendCompact(iter, builder, DELTA); } builder.append(']'); return builder.toString(); } + public void removeAll(Partitions masterPartitions) { + delegate.removeAll(masterPartitions.delegate); + } + + public Partitions unmodifiable() { + return new Partitions(IntSortedSets.unmodifiable(delegate)); + } + + public IntStream stream() { + return delegate.intStream().map(Partitions::decodePartition); + } + + private static int encodePartition(int partition) { + return partition - DELTA; + } + + private static int decodePartition(int value) { + return value + DELTA; + } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java index 42a3c0b212..13bd31b3c0 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java @@ -19,12 +19,10 @@ package org.apache.asterix.replication.logging; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.dataflow.DatasetLocalResource; @@ -32,6 +30,7 @@ import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.replication.messaging.ReplicationProtocol; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -90,9 +89,9 @@ class RemoteLogsNotifier implements Runnable { return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition; }; final Map<Long, LocalResource> resources = - localResourceRep.getResources(replicaIndexesPredicate, Collections.singleton(resourcePartition)); + localResourceRep.getResources(replicaIndexesPredicate, Partitions.singleton(resourcePartition)); final List<DatasetResourceReference> replicaIndexesRef = - resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + resources.values().stream().map(DatasetResourceReference::of).toList(); for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) { final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(replicaIndexRef); synchronized (indexCheckpointManager) { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index 68ccd54e50..b0a8c54a47 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.storage.IReplicaManager.PartitionSyncLock; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.messaging.CheckpointPartitionIndexesTask; @@ -48,7 +49,8 @@ public class ReplicaSynchronizer { public void sync(boolean register, boolean deltaRecovery) throws IOException { LOGGER.debug("starting replica sync process for replica {}", replica); - Object partitionLock = appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition()); + PartitionSyncLock partitionLock = + appCtx.getReplicaManager().getPartitionSyncLock(replica.getIdentifier().getPartition()); synchronized (partitionLock) { LOGGER.trace("acquired partition replica lock"); final ICheckpointManager checkpointManager = appCtx.getTransactionSubsystem().getCheckpointManager(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 668decfa3d..cb97929807 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -30,7 +30,6 @@ import java.util.SortedMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; -import java.util.stream.Collectors; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.cluster.ClusterPartition; @@ -43,6 +42,7 @@ import org.apache.asterix.common.replication.INcLifecycleCoordinator; import org.apache.asterix.common.transactions.IResourceIdManager; import org.apache.asterix.common.utils.NcLocalCounters; import org.apache.asterix.common.utils.PartitioningScheme; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.common.utils.StorageConstants; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -152,15 +152,15 @@ public class ClusterStateManager implements IClusterStateManager { @Override public synchronized void updateNodeState(String nodeId, boolean active, NcLocalCounters localCounters, - Set<Integer> activePartitions) { + Partitions activePartitions) { if (active) { updateClusterCounters(nodeId, localCounters); participantNodes.add(nodeId); if (appCtx.isCloudDeployment()) { // node compute partitions never change ClusterPartition[] nodePartitions = getNodePartitions(nodeId); - activePartitions = - Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet()); + activePartitions = Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId) + .collect(Partitions.collector()); activateNodePartitions(nodeId, activePartitions); } else { activateNodePartitions(nodeId, activePartitions); @@ -548,8 +548,8 @@ public class ClusterStateManager implements IClusterStateManager { }); } - private synchronized void activateNodePartitions(String nodeId, Set<Integer> activePartitions) { - for (Integer partitionId : activePartitions) { + private synchronized void activateNodePartitions(String nodeId, Partitions activePartitions) { + for (int partitionId : activePartitions) { updateClusterPartition(partitionId, nodeId, true); } } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index cb4e068aab..d53c258a76 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -56,6 +56,7 @@ import org.apache.asterix.common.storage.IIndexCheckpointManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.storage.ResourceStorageStats; +import org.apache.asterix.common.utils.Partitions; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -288,7 +289,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } } - public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions) + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Partitions partitions) throws HyracksDataException { beforeReadAccess(); try { @@ -427,12 +428,12 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } } - public Set<Integer> getAllPartitions() throws HyracksDataException { + public Partitions getAllPartitions() throws HyracksDataException { beforeReadAccess(); try { return loadAndGetAllResources().values().stream().map(LocalResource::getResource) .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition) - .collect(Collectors.toSet()); + .collect(Partitions.collector()); } finally { afterReadAccess(); } @@ -479,7 +480,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException { beforeReadAccess(); try { - return getResources(r -> true, Collections.singleton(partition)); + return getResources(r -> true, Partitions.singleton(partition)); } finally { afterReadAccess(); } @@ -712,7 +713,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return null; } - public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, Set<Integer> nodePartitions) + public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, Partitions nodePartitions) throws HyracksDataException { beforeReadAccess(); try {
