This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new d6adadc [NO ISSUE][STO] Ensure resources file operations are synchronized d6adadc is described below commit d6adadc8fd4fb5c04f41097aca8a433528a560d6 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Sun Sep 12 01:35:58 2021 +0300 [NO ISSUE][STO] Ensure resources file operations are synchronized - user model changes: no - storage format changes: no - interface changes: no Details: - To avoid an operation reading a partially written resource file, ensure all such operations are synchronized. - Limit partition resources search to the partition's root directory. Change-Id: I95f8565780675798393d7d43c0051c04e2b0a98c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13164 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../org/apache/asterix/app/nc/RecoveryManager.java | 5 +- .../replication/logging/RemoteLogsNotifier.java | 4 +- .../replication/messaging/ReplicateFileTask.java | 5 +- .../PersistentLocalResourceRepository.java | 80 ++++++++++++++-------- 4 files changed, 60 insertions(+), 34 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 4f5a9f8..6736642 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -291,7 +291,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { final IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider(); - Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources(); + Map<Long, LocalResource> resourcesMap = localResourceRepository.getResources(r -> true, partitions); final Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>(); TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false); @@ -503,7 +503,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { final List<DatasetResourceReference> partitionResources = localResourceRepository.getResources(resource -> { DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); return dsResource.getPartition() == partition; - }).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); + }, Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of) + .collect(Collectors.toList()); for (DatasetResourceReference indexRef : partitionResources) { try { final IIndexCheckpointManager idxCheckpointMgr = idxCheckpointMgrProvider.get(indexRef); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java index 57463cb..004b640 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java @@ -19,6 +19,7 @@ package org.apache.asterix.replication.logging; import java.io.IOException; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -91,7 +92,8 @@ class RemoteLogsNotifier implements Runnable { return dls.getDatasetId() == datasetId && dls.getPartition() == resourcePartition && !masterPartitions.contains(dls.getPartition()); }; - final Map<Long, LocalResource> resources = localResourceRep.getResources(replicaIndexesPredicate); + final Map<Long, LocalResource> resources = + localResourceRep.getResources(replicaIndexesPredicate, Collections.singleton(resourcePartition)); final List<DatasetResourceReference> replicaIndexesRef = resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) { diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java index 3ee3094..7f26b96 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java @@ -67,7 +67,7 @@ public class ReplicateFileTask implements IReplicaTask { @Override public void perform(INcApplicationContext appCtx, IReplicationWorker worker) { try { - LOGGER.info("attempting to replicate {}", this); + LOGGER.debug("attempting to receive file {} from master", this); final IIOManager ioManager = appCtx.getIoManager(); // resolve path final FileReference localPath = ioManager.resolve(file); @@ -76,7 +76,6 @@ public class ReplicateFileTask implements IReplicaTask { final Path maskPath = Paths.get(resourceDir.toString(), StorageConstants.MASK_FILE_PREFIX + localPath.getFile().getName()); Files.createFile(maskPath); - // receive actual file final Path filePath = Paths.get(resourceDir.toString(), localPath.getFile().getName()); Files.createFile(filePath); @@ -91,7 +90,7 @@ public class ReplicateFileTask implements IReplicaTask { } //delete mask Files.delete(maskPath); - LOGGER.info(() -> "Replicated file: " + localPath); + LOGGER.info("received file {} from master", localPath); ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer()); } catch (IOException e) { throw new ReplicationException(e); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 4d15385..ee5b16e 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -35,6 +35,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -138,7 +139,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito private boolean isReplicationEnabled = false; private Set<String> filesToBeReplicated; private IReplicationManager replicationManager; - private final Path[] storageRoots; + private final List<Path> storageRoots; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private final IPersistedResourceRegistry persistedResourceRegistry; @@ -148,11 +149,11 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito this.ioManager = ioManager; this.indexCheckpointManagerProvider = indexCheckpointManagerProvider; this.persistedResourceRegistry = persistedResourceRegistry; - storageRoots = new Path[ioManager.getIODevices().size()]; + storageRoots = new ArrayList<>(); final List<IODeviceHandle> ioDevices = ioManager.getIODevices(); for (int i = 0; i < ioDevices.size(); i++) { - storageRoots[i] = - Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME); + storageRoots.add( + Paths.get(ioDevices.get(i).getMount().getAbsolutePath(), StorageConstants.STORAGE_ROOT_DIR_NAME)); } createStorageRoots(); resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build(); @@ -262,10 +263,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return ioManager.resolve(fileName); } - public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) + public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, List<Path> roots) throws HyracksDataException { Map<Long, LocalResource> resourcesMap = new HashMap<>(); - for (Path root : storageRoots) { + for (Path root : roots) { + if (!Files.exists(root) || !Files.isDirectory(root)) { + continue; + } final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER); try { for (File file : files) { @@ -281,6 +285,20 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return resourcesMap; } + public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter) + throws HyracksDataException { + return getResources(filter, storageRoots); + } + + public synchronized Map<Long, LocalResource> getResources(Predicate<LocalResource> filter, Set<Integer> partitions) + throws HyracksDataException { + List<Path> partitionsRoots = new ArrayList<>(); + for (Integer partition : partitions) { + partitionsRoots.add(getPartitionRoot(partition)); + } + return getResources(filter, partitionsRoots); + } + public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException { for (Path root : storageRoots) { final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER); @@ -304,7 +322,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } @Override - public long maxId() throws HyracksDataException { + public synchronized long maxId() throws HyracksDataException { final Map<Long, LocalResource> allResources = loadAndGetAllResources(); final Optional<Long> max = allResources.keySet().stream().max(Long::compare); return max.isPresent() ? max.get() : 0; @@ -330,7 +348,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } } - public void setReplicationManager(IReplicationManager replicationManager) { + public synchronized void setReplicationManager(IReplicationManager replicationManager) { this.replicationManager = replicationManager; isReplicationEnabled = replicationManager.isReplicationEnabled(); @@ -357,7 +375,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito * * @throws IOException */ - public void deleteStorageData() throws IOException { + public synchronized void deleteStorageData() throws IOException { for (Path root : storageRoots) { final File rootFile = root.toFile(); if (rootFile.exists()) { @@ -367,13 +385,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito createStorageRoots(); } - public Set<Integer> getAllPartitions() throws HyracksDataException { + public synchronized Set<Integer> getAllPartitions() throws HyracksDataException { return loadAndGetAllResources().values().stream().map(LocalResource::getResource) .map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition) .collect(Collectors.toSet()); } - public Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath) + public synchronized Optional<DatasetResourceReference> getLocalResourceReference(String absoluteFilePath) throws HyracksDataException { final String localResourcePath = StoragePathUtil.getIndexFileRelativePath(absoluteFilePath); final LocalResource lr = get(localResourcePath); @@ -388,11 +406,12 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito * @return The set of indexes files * @throws HyracksDataException */ - public Set<File> getPartitionIndexes(int partition) throws HyracksDataException { + public synchronized Set<File> getPartitionIndexes(int partition) throws HyracksDataException { + Path partitionRoot = getPartitionRoot(partition); final Map<Long, LocalResource> partitionResourcesMap = getResources(resource -> { DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); return dsResource.getPartition() == partition; - }); + }, Collections.singletonList(partitionRoot)); Set<File> indexes = new HashSet<>(); for (LocalResource localResource : partitionResourcesMap.values()) { indexes.add(ioManager.resolve(localResource.getPath()).getFile()); @@ -400,14 +419,11 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return indexes; } - public Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException { - return getResources(resource -> { - DatasetLocalResource dsResource = (DatasetLocalResource) resource.getResource(); - return dsResource.getPartition() == partition; - }); + public synchronized Map<Long, LocalResource> getPartitionResources(int partition) throws HyracksDataException { + return getResources(r -> true, Collections.singleton(partition)); } - public Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy) + public synchronized Map<String, Long> getPartitionReplicatedResources(int partition, IReplicationStrategy strategy) throws HyracksDataException { final Map<String, Long> partitionReplicatedResources = new HashMap<>(); final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); @@ -421,7 +437,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return partitionReplicatedResources; } - public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy) + public synchronized List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy) throws HyracksDataException { final List<String> partitionReplicatedFiles = new ArrayList<>(); final Set<File> replicatedIndexes = new HashSet<>(); @@ -438,7 +454,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return partitionReplicatedFiles; } - public long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy) + public synchronized long getReplicatedIndexesMaxComponentId(int partition, IReplicationStrategy strategy) throws HyracksDataException { long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID; final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); @@ -474,7 +490,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } } - public void cleanup(int partition) throws HyracksDataException { + public synchronized void cleanup(int partition) throws HyracksDataException { final Set<File> partitionIndexes = getPartitionIndexes(partition); try { for (File index : partitionIndexes) { @@ -501,7 +517,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return resourcesStats; } - public void deleteCorruptedResources() throws HyracksDataException { + public synchronized void deleteCorruptedResources() throws HyracksDataException { for (Path root : storageRoots) { final Collection<File> metadataMaskFiles = FileUtils.listFiles(root.toFile(), METADATA_MASK_FILES_FILTER, ALL_DIR_FILTER); @@ -601,12 +617,13 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return null; } - public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier) throws HyracksDataException { + public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier, Set<Integer> nodePartitions) + throws HyracksDataException { long totalSize = 0; final Map<Long, LocalResource> dataverse = getResources(lr -> { final ResourceReference resourceReference = ResourceReference.ofIndex(lr.getPath()); return datasetIdentifier.isMatch(resourceReference); - }); + }, nodePartitions); final List<DatasetResourceReference> allResources = dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList()); for (DatasetResourceReference res : allResources) { @@ -644,11 +661,11 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return COMPONENT_FILES_FILTER.accept(indexDir, fileName); } - public Path[] getStorageRoots() { + public List<Path> getStorageRoots() { return storageRoots; } - public void keepPartitions(Set<Integer> keepPartitions) { + public synchronized void keepPartitions(Set<Integer> keepPartitions) { List<File> onDiskPartitions = getOnDiskPartitions(); for (File onDiskPartition : onDiskPartitions) { int partitionNum = StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath()); @@ -660,7 +677,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } } - public List<File> getOnDiskPartitions() { + public synchronized List<File> getOnDiskPartitions() { List<File> onDiskPartitions = new ArrayList<>(); for (Path root : storageRoots) { File[] partitions = root.toFile().listFiles( @@ -671,4 +688,11 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito } return onDiskPartitions; } + + public Path getPartitionRoot(int partition) throws HyracksDataException { + Path path = + Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME, StorageConstants.PARTITION_DIR_PREFIX + partition); + FileReference resolve = ioManager.resolve(path.toString()); + return resolve.getFile().toPath(); + } }