This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit b38d2366d029293025d8eadcb6cb1bc39c8a1137 Author: Ritik Raj <[email protected]> AuthorDate: Sat Feb 15 00:10:24 2025 +0530 [ASTERIXDB-3563][STO] Delay activation of dataset until accessed - user model changes: no - storage format changes: no - interface changes: yes Details: activate the dataset when its being accessed in cloud mode. Ext-ref: MB-63037 Change-Id: If64a7fbd9701772ffa42761d1557223a3eea95be Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19443 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Ritik Raj <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Ritik Raj <[email protected]> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 5 +- .../org/apache/asterix/app/nc/RecoveryManager.java | 10 ++ .../common/context/DatasetLifecycleManager.java | 134 ++++++++++++++++++--- .../asterix/common/context/DatasetResource.java | 17 +++ .../common/transactions/IRecoveryManager.java | 17 +++ .../asterix/common/utils/StoragePathUtil.java | 19 ++- .../PersistentLocalResourceRepository.java | 1 + .../storage/common/ILocalResourceRepository.java | 8 ++ .../common/TransientLocalResourceRepository.java | 9 ++ 9 files changed, 202 insertions(+), 18 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index de4a066b1b..007826ba4e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -286,8 +286,9 @@ public class NCAppRuntimeContext implements INcApplicationContext { // Must start vbc now instead of by life cycle component manager (lccm) because lccm happens after // the metadata bootstrap task ((ILifeCycleComponent) virtualBufferCache).start(); - datasetLifecycleManager = new DatasetLifecycleManager(storageProperties, localResourceRepository, - txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier); + datasetLifecycleManager = + new DatasetLifecycleManager(ncServiceContext, storageProperties, localResourceRepository, recoveryMgr, + txnSubsystem.getLogManager(), virtualBufferCache, indexCheckpointManagerProvider, lockNotifier); localResourceRepository.setDatasetLifecycleManager(datasetLifecycleManager); final String nodeId = getServiceContext().getNodeId(); final Set<Integer> nodePartitions = metadataProperties.getNodePartitions(nodeId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 179b996742..269f6f7224 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -901,6 +901,16 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { return maxDiskLastLsn; } + @Override + public boolean isLazyRecoveryEnabled() { + return false; + } + + @Override + public void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException { + // do-nothing + } + private class JobEntityCommits { private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; private final long txnId; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 43b5d1b10e..85916b077d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -36,6 +36,7 @@ import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.dataflow.LSMIndexUtil; import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManager; @@ -43,12 +44,16 @@ import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.storage.StorageIOStats; import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.transactions.LogType; import org.apache.asterix.common.utils.StoragePathUtil; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeLocalResource; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -67,22 +72,26 @@ import org.apache.logging.log4j.Logger; public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeCycleComponent { private static final Logger LOGGER = LogManager.getLogger(); - private final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>(); + protected final Map<Integer, DatasetResource> datasets = new ConcurrentHashMap<>(); private final StorageProperties storageProperties; - private final ILocalResourceRepository resourceRepository; + protected final ILocalResourceRepository resourceRepository; private final IVirtualBufferCache vbc; + protected final INCServiceContext serviceCtx; + protected final IRecoveryManager recoveryMgr; private final ILogManager logManager; private final LogRecord waitLog; - private final IDiskResourceCacheLockNotifier lockNotifier; + protected final IDiskResourceCacheLockNotifier lockNotifier; private volatile boolean stopped = false; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; // all LSM-trees share the same virtual buffer cache list private final List<IVirtualBufferCache> vbcs; - public DatasetLifecycleManager(StorageProperties storageProperties, ILocalResourceRepository resourceRepository, - ILogManager logManager, IVirtualBufferCache vbc, - IIndexCheckpointManagerProvider indexCheckpointManagerProvider, + public DatasetLifecycleManager(INCServiceContext serviceCtx, StorageProperties storageProperties, + ILocalResourceRepository resourceRepository, IRecoveryManager recoveryMgr, ILogManager logManager, + IVirtualBufferCache vbc, IIndexCheckpointManagerProvider indexCheckpointManagerProvider, IDiskResourceCacheLockNotifier lockNotifier) { + this.serviceCtx = serviceCtx; + this.recoveryMgr = recoveryMgr; this.logManager = logManager; this.storageProperties = storageProperties; this.resourceRepository = resourceRepository; @@ -130,7 +139,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC datasetResource.register(resource, (ILSMIndex) index); } - private int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { + protected int getDIDfromResourcePath(String resourcePath) throws HyracksDataException { LocalResource lr = resourceRepository.get(resourcePath); if (lr == null) { return -1; @@ -138,7 +147,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC return ((DatasetLocalResource) lr.getResource()).getDatasetId(); } - private long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException { + protected long getResourceIDfromResourcePath(String resourcePath) throws HyracksDataException { LocalResource lr = resourceRepository.get(resourcePath); if (lr == null) { return -1; @@ -146,6 +155,14 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC return lr.getId(); } + private DatasetLocalResource getDatasetLocalResource(String resourcePath) throws HyracksDataException { + LocalResource lr = resourceRepository.get(resourcePath); + if (lr == null) { + return null; + } + return (DatasetLocalResource) lr.getResource(); + } + @Override public synchronized void unregister(String resourcePath) throws HyracksDataException { validateDatasetLifecycleManagerState(); @@ -192,6 +209,72 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC @Override public synchronized void open(String resourcePath) throws HyracksDataException { + validateDatasetLifecycleManagerState(); + DatasetLocalResource localResource = getDatasetLocalResource(resourcePath); + if (localResource == null) { + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath); + } + int did = getDIDfromResourcePath(resourcePath); + long resourceID = getResourceIDfromResourcePath(resourcePath); + + lockNotifier.onOpen(resourceID); + try { + DatasetResource datasetResource = datasets.get(did); + int partition = localResource.getPartition(); + if (shouldRecoverLazily(datasetResource, partition)) { + performLocalRecovery(resourcePath, datasetResource, partition); + } else { + openResource(resourcePath, false); + } + } finally { + lockNotifier.onClose(resourceID); + } + } + + private void performLocalRecovery(String resourcePath, DatasetResource datasetResource, int partition) + throws HyracksDataException { + LOGGER.debug("performing local recovery for dataset {} partition {}", datasetResource.getDatasetInfo(), + partition); + FileReference indexRootRef = StoragePathUtil.getIndexRootPath(serviceCtx.getIoManager(), resourcePath); + Map<Long, LocalResource> resources = resourceRepository.getResources(r -> true, List.of(indexRootRef)); + + List<ILSMIndex> indexes = new ArrayList<>(); + for (LocalResource resource : resources.values()) { + if (shouldSkipResource(resource)) { + continue; + } + + ILSMIndex index = getOrCreateIndex(resource); + boolean undoTouch = !resourcePath.equals(resource.getPath()); + openResource(resource.getPath(), undoTouch); + indexes.add(index); + } + + if (!indexes.isEmpty()) { + recoveryMgr.recoverIndexes(indexes); + } + + datasetResource.markRecovered(partition); + } + + private boolean shouldSkipResource(LocalResource resource) { + DatasetLocalResource lr = (DatasetLocalResource) resource.getResource(); + return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId()) + || (lr.getResource() instanceof LSMBTreeLocalResource + && ((LSMBTreeLocalResource) lr.getResource()).isSecondaryNoIncrementalMaintenance()); + } + + private ILSMIndex getOrCreateIndex(LocalResource resource) throws HyracksDataException { + ILSMIndex index = get(resource.getPath()); + if (index == null) { + DatasetLocalResource lr = (DatasetLocalResource) resource.getResource(); + index = (ILSMIndex) lr.createInstance(serviceCtx); + register(resource.getPath(), index); + } + return index; + } + + private void openResource(String resourcePath, boolean undoTouch) throws HyracksDataException { validateDatasetLifecycleManagerState(); int did = getDIDfromResourcePath(resourcePath); long resourceID = getResourceIDfromResourcePath(resourcePath); @@ -214,15 +297,36 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC dsr.open(true); dsr.touch(); - - if (!iInfo.isOpen()) { - ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker(); - synchronized (opTracker) { - iInfo.getIndex().activate(); + boolean indexTouched = false; + try { + if (!iInfo.isOpen()) { + ILSMOperationTracker opTracker = iInfo.getIndex().getOperationTracker(); + synchronized (opTracker) { + iInfo.getIndex().activate(); + } + iInfo.setOpen(true); + } + iInfo.touch(); + indexTouched = true; + } finally { + if (undoTouch) { + dsr.untouch(); + if (indexTouched) { + iInfo.untouch(); + } + lockNotifier.onClose(resourceID); } - iInfo.setOpen(true); } - iInfo.touch(); + } + + private boolean shouldRecoverLazily(DatasetResource resource, int partition) { + // Perform lazy recovery only if the following conditions are met: + // 1. Lazy recovery is enabled. + // 2. The resource does not belong to the Metadata dataverse. + // 3. The partition is being accessed for the first time. + return recoveryMgr.isLazyRecoveryEnabled() + && !MetadataIndexImmutableProperties.isMetadataDataset(resource.getDatasetID()) + && !resource.isRecovered(partition); } public DatasetResource getDatasetLifecycle(int did) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java index db9eabb1ce..8e3081deed 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java @@ -20,7 +20,9 @@ package org.apache.asterix.common.context; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; @@ -48,12 +50,14 @@ public class DatasetResource implements Comparable<DatasetResource> { private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers; private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators; private final Map<Integer, IRateLimiter> datasetRateLimiters; + private final Set<Integer> recoveredPartitions; public DatasetResource(DatasetInfo datasetInfo) { this.datasetInfo = datasetInfo; this.datasetPrimaryOpTrackers = new HashMap<>(); this.datasetComponentIdGenerators = new HashMap<>(); this.datasetRateLimiters = new HashMap<>(); + this.recoveredPartitions = new HashSet<>(); } public boolean isRegistered() { @@ -127,6 +131,10 @@ public class DatasetResource implements Comparable<DatasetResource> { return datasetComponentIdGenerators.get(partition); } + public boolean isRecovered(int partitionId) { + return recoveredPartitions.contains(partitionId); + } + public IRateLimiter getRateLimiter(int partition) { return datasetRateLimiters.get(partition); } @@ -139,6 +147,14 @@ public class DatasetResource implements Comparable<DatasetResource> { datasetPrimaryOpTrackers.put(partition, opTracker); } + public void markRecovered(int partition) { + if (recoveredPartitions.contains(partition)) { + throw new IllegalStateException( + "Index has already been recovered for dataset" + getDatasetID() + "partition " + partition); + } + recoveredPartitions.add(partition); + } + public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) { if (datasetComponentIdGenerators.containsKey(partition)) { throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition); @@ -187,5 +203,6 @@ public class DatasetResource implements Comparable<DatasetResource> { datasetPrimaryOpTrackers.remove(partitionId); datasetComponentIdGenerators.remove(partitionId); datasetRateLimiters.remove(partitionId); + recoveredPartitions.remove(partitionId); } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index a5f79ac850..3437d4202d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -20,10 +20,12 @@ package org.apache.asterix.common.transactions; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Set; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; /** * Provides API for failure recovery. Failure could be at application level and @@ -128,4 +130,19 @@ public interface IRecoveryManager { */ void replayReplicaPartitionLogs(Set<Integer> partitions, boolean flush) throws HyracksDataException; + /** + * Ensures that {@code datasetPartitionIndexes} are consistent by performing component id level recovery + * + * @param datasetPartitionIndexes A list of indexes associated with a specific + * dataset partition that require recovery. + * @throws HyracksDataException If an error occurs during the recovery or rollback + * process, indicating a failure to achieve consistency. + */ + void recoverIndexes(List<ILSMIndex> datasetPartitionIndexes) throws HyracksDataException; + + /** + * determines if the indexes need to be recovered lazily at the time of their first access + * @return + */ + boolean isLazyRecoveryEnabled(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 28fd27ec22..2fdfba4bfe 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -139,6 +139,13 @@ public class StoragePathUtil { return ResourceReference.of(fileAbsolutePath).getFileRelativePath().toString(); } + public static FileReference getIndexRootPath(IIOManager ioManager, String relativePath) + throws HyracksDataException { + int separatorIndex = relativePath.lastIndexOf(File.separatorChar); + String parentDirectory = relativePath.substring(0, separatorIndex); + return ioManager.resolve(parentDirectory); + } + /** * Create a file * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when @@ -229,7 +236,17 @@ public class StoragePathUtil { } public static boolean isRelativeParent(FileReference parent, FileReference child) { - return child.getRelativePath().startsWith(parent.getRelativePath()); + String childPath = child.getRelativePath(); + String parentPath = parent.getRelativePath(); + boolean isMatch = childPath.startsWith(parentPath); + if (isMatch) { + int parentPathLength = parentPath.length(); + if (childPath.length() == parentPathLength) { + return true; + } + return childPath.charAt(parentPathLength) == File.separatorChar; + } + return false; } public static String getNamespacePath(INamespacePathResolver nsPathResolver, Namespace namespace, int partition) { diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 5c7d5ac366..cb4e068aab 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -251,6 +251,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return ioManager.resolve(fileName); } + @Override public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots) throws HyracksDataException { beforeReadAccess(); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java index 491d47613b..46eb1d56c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/ILocalResourceRepository.java @@ -18,7 +18,12 @@ */ package org.apache.hyracks.storage.common; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; public interface ILocalResourceRepository { @@ -28,5 +33,8 @@ public interface ILocalResourceRepository { void delete(String name) throws HyracksDataException; + Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> resourceFolderRoot) + throws HyracksDataException; + long maxId() throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java index 2e756eacab..5f2ccc6243 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/TransientLocalResourceRepository.java @@ -19,9 +19,12 @@ package org.apache.hyracks.storage.common; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Predicate; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; public class TransientLocalResourceRepository implements ILocalResourceRepository { @@ -54,6 +57,12 @@ public class TransientLocalResourceRepository implements ILocalResourceRepositor name2ResourceMap.remove(path); } + @Override + public Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<FileReference> roots) + throws HyracksDataException { + return Map.of(); + } + @Override public long maxId() throws HyracksDataException { long maxResourceId = 0;
