This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new ae3c273a54 [ASTERIXDB-3196][*DB] Cluster state for compute-storage
separation
ae3c273a54 is described below
commit ae3c273a549a92501f47ccf8713cb2aba1beee58
Author: Murtadha Hubail <[email protected]>
AuthorDate: Wed May 24 15:17:30 2023 +0300
[ASTERIXDB-3196][*DB] Cluster state for compute-storage separation
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Implement changes required to drive cluster state based on
compute-storage partitions map.
- Persist index checkpoints to cloud storage.
- Remove eager caching from NC startup tasks.
- Fixes for static data partitioning.
Change-Id: I217da04d06884d841c4a56aee3ab9815cc659de7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17553
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../apache/asterix/app/nc/NCAppRuntimeContext.java | 3 +-
.../app/replication/NcLifecycleCoordinator.java | 7 ---
.../common/cluster/IClusterStateManager.java | 6 +++
.../cluster/StorageComputePartitionsMap.java | 11 +++-
.../metadata/declared/MetadataProvider.java | 9 +---
.../asterix/runtime/utils/ClusterStateManager.java | 62 +++++++++++++++++-----
.../PersistentLocalResourceRepository.java | 4 +-
7 files changed, 67 insertions(+), 35 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index f60ed63872..7f0b529480 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -204,8 +204,7 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
storageProperties.getBufferCachePageSize(),
storageProperties.getBufferCacheNumPages());
lsmIOScheduler = createIoScheduler(storageProperties);
metadataMergePolicyFactory = new ConcurrentMergePolicyFactory();
- // TODO do we want to write checkpoints for cloud?
- indexCheckpointManagerProvider = new
IndexCheckpointManagerProvider(ioManager);
+ indexCheckpointManagerProvider = new
IndexCheckpointManagerProvider(persistenceIOManager);
ILocalResourceRepositoryFactory
persistentLocalResourceRepositoryFactory =
new
PersistentLocalResourceRepositoryFactory(persistenceIOManager,
indexCheckpointManagerProvider,
persistedResourceRegistry);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index 4f9613d2a7..449dd27845 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -35,7 +35,6 @@ import java.util.Set;
import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
-import org.apache.asterix.app.nc.task.CloudToLocalStorageCachingTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
@@ -52,7 +51,6 @@ import
org.apache.asterix.app.replication.message.RegistrationTasksResponseMessa
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.INCLifecycleTask;
import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.asterix.common.messaging.api.ICCMessageBroker;
@@ -222,11 +220,6 @@ public class NcLifecycleCoordinator implements
INcLifecycleCoordinator {
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING,
nodeActivePartitions));
int metadataPartitionId =
clusterManager.getMetadataPartition().getPartitionId();
tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
-
- if (((ICcApplicationContext)
(serviceContext.getControllerService()).getApplicationContext())
- .isCloudDeployment()) {
- tasks.add(new CloudToLocalStorageCachingTask(activePartitions));
- }
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
LocalRecoveryTask rt = new LocalRecoveryTask(nodeActivePartitions);
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 3d0fee8e22..5f714f436d 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -286,4 +286,10 @@ public interface IClusterStateManager {
* @return the count of storage partitions
*/
int getStoragePartitionsCount();
+
+ /**
+ * Sets the compute-storage partitions map
+ * @param map
+ */
+ void setComputeStoragePartitionsMap(StorageComputePartitionsMap map);
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
index 48b2ea1a93..6561d05b78 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/StorageComputePartitionsMap.java
@@ -23,6 +23,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
public class StorageComputePartitionsMap {
@@ -53,9 +55,10 @@ public class StorageComputePartitionsMap {
}
}
int[][] computerToStoArray = new
int[computeToStoragePartitions.size()][];
+ int partitionIdx = 0;
for (Map.Entry<Integer, List<Integer>> integerListEntry :
computeToStoragePartitions.entrySet()) {
- computerToStoArray[integerListEntry.getKey()] =
- integerListEntry.getValue().stream().mapToInt(i ->
i).toArray();
+ computerToStoArray[partitionIdx] =
integerListEntry.getValue().stream().mapToInt(i -> i).toArray();
+ partitionIdx++;
}
return computerToStoArray;
}
@@ -94,4 +97,8 @@ public class StorageComputePartitionsMap {
}
return newMap;
}
+
+ public Set<String> getComputeNodes() {
+ return
stoToComputeLocation.values().stream().map(ComputePartition::getNodeId).collect(Collectors.toSet());
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index b07a03eddc..ea1b9e675e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
-import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -891,13 +890,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
} else {
numElementsHint = Long.parseLong(numElementsHintString);
}
- int numPartitions = 0;
- List<String> nodeGroup =
- MetadataManager.INSTANCE.getNodegroup(mdTxnCtx,
dataset.getNodeGroupName()).getNodeNames();
- IClusterStateManager csm = appCtx.getClusterStateManager();
- for (String nd : nodeGroup) {
- numPartitions += csm.getNodePartitionsCount(nd);
- }
+ int numPartitions =
getPartitioningProperties(dataset).getNumberOfPartitions();
return numElementsHint / numPartitions;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 92ea173fb4..0128478647 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -30,6 +30,7 @@ import java.util.SortedMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.cluster.ClusterPartition;
@@ -155,7 +156,16 @@ public class ClusterStateManager implements
IClusterStateManager {
if (active) {
updateClusterCounters(nodeId, localCounters);
participantNodes.add(nodeId);
- activateNodePartitions(nodeId, activePartitions);
+ if (appCtx.isCloudDeployment()) {
+ // node compute partitions never change
+ ClusterPartition[] nodePartitions = getNodePartitions(nodeId);
+ activePartitions =
+
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId).collect(Collectors.toSet());
+ activateNodePartitions(nodeId, activePartitions);
+ } else {
+ activateNodePartitions(nodeId, activePartitions);
+ }
+
} else {
participantNodes.remove(nodeId);
deactivateNodePartitions(nodeId);
@@ -183,16 +193,7 @@ public class ClusterStateManager implements
IClusterStateManager {
return;
}
resetClusterPartitionConstraint();
- // if the cluster has no registered partitions or all partitions are
pending activation -> UNUSABLE
- if (clusterPartitions.isEmpty()
- ||
clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation))
{
- LOGGER.info("Cluster does not have any registered partitions");
- setState(ClusterState.UNUSABLE);
- return;
- }
-
- // exclude partitions that are pending activation
- if (clusterPartitions.values().stream().anyMatch(p -> !p.isActive() &&
!p.isPendingActivation())) {
+ if (isClusterUnusable()) {
setState(ClusterState.UNUSABLE);
return;
}
@@ -310,9 +311,7 @@ public class ClusterStateManager implements
IClusterStateManager {
clusterActiveLocations.removeAll(pendingRemoval);
clusterPartitionConstraint =
new
AlgebricksAbsolutePartitionConstraint(clusterActiveLocations.toArray(new
String[] {}));
- if (appCtx.getStorageProperties().getPartitioningScheme() ==
PartitioningScheme.STATIC) {
- storageComputePartitionsMap =
StorageComputePartitionsMap.computePartitionsMap(this);
- }
+ resetStorageComputeMap();
}
@Override
@@ -512,6 +511,11 @@ public class ClusterStateManager implements
IClusterStateManager {
return storageComputePartitionsMap;
}
+ @Override
+ public synchronized void
setComputeStoragePartitionsMap(StorageComputePartitionsMap map) {
+ this.storageComputePartitionsMap = map;
+ }
+
private void updateClusterCounters(String nodeId, NcLocalCounters
localCounters) {
final IResourceIdManager resourceIdManager =
appCtx.getResourceIdManager();
resourceIdManager.report(nodeId, localCounters.getMaxResourceId());
@@ -543,6 +547,36 @@ public class ClusterStateManager implements
IClusterStateManager {
false));
}
+ private synchronized boolean isClusterUnusable() {
+ // if the cluster has no registered partitions or all partitions are
pending activation -> UNUSABLE
+ if (clusterPartitions.isEmpty()
+ ||
clusterPartitions.values().stream().allMatch(ClusterPartition::isPendingActivation))
{
+ LOGGER.info("Cluster does not have any registered partitions");
+ return true;
+ }
+ if (appCtx.isCloudDeployment() && storageComputePartitionsMap != null)
{
+ Set<String> computeNodes =
storageComputePartitionsMap.getComputeNodes();
+ if (!participantNodes.containsAll(computeNodes)) {
+ LOGGER.info("Cluster missing compute nodes; required {},
current {}", computeNodes, participantNodes);
+ return true;
+ }
+ } else {
+ // exclude partitions that are pending activation
+ if (clusterPartitions.values().stream().anyMatch(p ->
!p.isActive() && !p.isPendingActivation())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private synchronized void resetStorageComputeMap() {
+ if (storageComputePartitionsMap == null
+ && appCtx.getStorageProperties().getPartitioningScheme() ==
PartitioningScheme.STATIC
+ && !isClusterUnusable()) {
+ storageComputePartitionsMap =
StorageComputePartitionsMap.computePartitionsMap(this);
+ }
+ }
+
private static InetSocketAddress getReplicaLocation(IClusterStateManager
csm, String nodeId) {
final Map<IOption, Object> ncConfig =
csm.getActiveNcConfiguration().get(nodeId);
if (ncConfig == null) {
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 080204eb0a..25b961017c 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -646,8 +646,8 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public synchronized List<FileReference> getOnDiskPartitions() throws
HyracksDataException {
List<FileReference> onDiskPartitions = new ArrayList<>();
for (FileReference root : storageRoots) {
- Collection<FileReference> partitions = ioManager.list(root,
- (dir, name) -> dir.isDirectory() &&
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
+ Collection<FileReference> partitions = ioManager.list(root, (dir,
name) -> dir != null && dir.isDirectory()
+ && name.startsWith(StorageConstants.PARTITION_DIR_PREFIX));
if (partitions != null) {
onDiskPartitions.addAll(partitions);
}