This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new d6adadc [NO ISSUE][STO] Ensure resources file operations are
synchronized
d6adadc is described below
commit d6adadc8fd4fb5c04f41097aca8a433528a560d6
Author: Murtadha Hubail <[email protected]>
AuthorDate: Sun Sep 12 01:35:58 2021 +0300
[NO ISSUE][STO] Ensure resources file operations are synchronized
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To avoid an operation reading a partially written resource file,
ensure all such operations are synchronized.
- Limit partition resources search to the partition's root directory.
Change-Id: I95f8565780675798393d7d43c0051c04e2b0a98c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/13164
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../org/apache/asterix/app/nc/RecoveryManager.java | 5 +-
.../replication/logging/RemoteLogsNotifier.java | 4 +-
.../replication/messaging/ReplicateFileTask.java | 5 +-
.../PersistentLocalResourceRepository.java | 80 ++++++++++++++--------
4 files changed, 60 insertions(+), 34 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 4f5a9f8..6736642 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -291,7 +291,7 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
((INcApplicationContext)
(serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
- Map<Long, LocalResource> resourcesMap =
localResourceRepository.loadAndGetAllResources();
+ Map<Long, LocalResource> resourcesMap =
localResourceRepository.getResources(r -> true, partitions);
final Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1,
false);
@@ -503,7 +503,8 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
final List<DatasetResourceReference> partitionResources =
localResourceRepository.getResources(resource -> {
DatasetLocalResource dsResource = (DatasetLocalResource)
resource.getResource();
return dsResource.getPartition() == partition;
-
}).values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+ },
Collections.singleton(partition)).values().stream().map(DatasetResourceReference::of)
+ .collect(Collectors.toList());
for (DatasetResourceReference indexRef : partitionResources) {
try {
final IIndexCheckpointManager idxCheckpointMgr =
idxCheckpointMgrProvider.get(indexRef);
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
index 57463cb..004b640 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/logging/RemoteLogsNotifier.java
@@ -19,6 +19,7 @@
package org.apache.asterix.replication.logging;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -91,7 +92,8 @@ class RemoteLogsNotifier implements Runnable {
return dls.getDatasetId() == datasetId && dls.getPartition() ==
resourcePartition
&& !masterPartitions.contains(dls.getPartition());
};
- final Map<Long, LocalResource> resources =
localResourceRep.getResources(replicaIndexesPredicate);
+ final Map<Long, LocalResource> resources =
+ localResourceRep.getResources(replicaIndexesPredicate,
Collections.singleton(resourcePartition));
final List<DatasetResourceReference> replicaIndexesRef =
resources.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
for (DatasetResourceReference replicaIndexRef : replicaIndexesRef) {
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 3ee3094..7f26b96 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -67,7 +67,7 @@ public class ReplicateFileTask implements IReplicaTask {
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker
worker) {
try {
- LOGGER.info("attempting to replicate {}", this);
+ LOGGER.debug("attempting to receive file {} from master", this);
final IIOManager ioManager = appCtx.getIoManager();
// resolve path
final FileReference localPath = ioManager.resolve(file);
@@ -76,7 +76,6 @@ public class ReplicateFileTask implements IReplicaTask {
final Path maskPath = Paths.get(resourceDir.toString(),
StorageConstants.MASK_FILE_PREFIX +
localPath.getFile().getName());
Files.createFile(maskPath);
-
// receive actual file
final Path filePath = Paths.get(resourceDir.toString(),
localPath.getFile().getName());
Files.createFile(filePath);
@@ -91,7 +90,7 @@ public class ReplicateFileTask implements IReplicaTask {
}
//delete mask
Files.delete(maskPath);
- LOGGER.info(() -> "Replicated file: " + localPath);
+ LOGGER.info("received file {} from master", localPath);
ReplicationProtocol.sendAck(worker.getChannel(),
worker.getReusableBuffer());
} catch (IOException e) {
throw new ReplicationException(e);
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 4d15385..ee5b16e 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -35,6 +35,7 @@ import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -138,7 +139,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
private IReplicationManager replicationManager;
- private final Path[] storageRoots;
+ private final List<Path> storageRoots;
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
@@ -148,11 +149,11 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
this.ioManager = ioManager;
this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.persistedResourceRegistry = persistedResourceRegistry;
- storageRoots = new Path[ioManager.getIODevices().size()];
+ storageRoots = new ArrayList<>();
final List<IODeviceHandle> ioDevices = ioManager.getIODevices();
for (int i = 0; i < ioDevices.size(); i++) {
- storageRoots[i] =
- Paths.get(ioDevices.get(i).getMount().getAbsolutePath(),
StorageConstants.STORAGE_ROOT_DIR_NAME);
+ storageRoots.add(
+ Paths.get(ioDevices.get(i).getMount().getAbsolutePath(),
StorageConstants.STORAGE_ROOT_DIR_NAME));
}
createStorageRoots();
resourceCache =
CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
@@ -262,10 +263,13 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return ioManager.resolve(fileName);
}
- public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter)
+ public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter, List<Path> roots)
throws HyracksDataException {
Map<Long, LocalResource> resourcesMap = new HashMap<>();
- for (Path root : storageRoots) {
+ for (Path root : roots) {
+ if (!Files.exists(root) || !Files.isDirectory(root)) {
+ continue;
+ }
final Collection<File> files = FileUtils.listFiles(root.toFile(),
METADATA_FILES_FILTER, ALL_DIR_FILTER);
try {
for (File file : files) {
@@ -281,6 +285,20 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return resourcesMap;
}
+ public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter)
+ throws HyracksDataException {
+ return getResources(filter, storageRoots);
+ }
+
+ public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
+ throws HyracksDataException {
+ List<Path> partitionsRoots = new ArrayList<>();
+ for (Integer partition : partitions) {
+ partitionsRoots.add(getPartitionRoot(partition));
+ }
+ return getResources(filter, partitionsRoots);
+ }
+
public synchronized void deleteInvalidIndexes(Predicate<LocalResource>
filter) throws HyracksDataException {
for (Path root : storageRoots) {
final Collection<File> files = FileUtils.listFiles(root.toFile(),
METADATA_FILES_FILTER, ALL_DIR_FILTER);
@@ -304,7 +322,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
@Override
- public long maxId() throws HyracksDataException {
+ public synchronized long maxId() throws HyracksDataException {
final Map<Long, LocalResource> allResources = loadAndGetAllResources();
final Optional<Long> max =
allResources.keySet().stream().max(Long::compare);
return max.isPresent() ? max.get() : 0;
@@ -330,7 +348,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
}
- public void setReplicationManager(IReplicationManager replicationManager) {
+ public synchronized void setReplicationManager(IReplicationManager
replicationManager) {
this.replicationManager = replicationManager;
isReplicationEnabled = replicationManager.isReplicationEnabled();
@@ -357,7 +375,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
*
* @throws IOException
*/
- public void deleteStorageData() throws IOException {
+ public synchronized void deleteStorageData() throws IOException {
for (Path root : storageRoots) {
final File rootFile = root.toFile();
if (rootFile.exists()) {
@@ -367,13 +385,13 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
createStorageRoots();
}
- public Set<Integer> getAllPartitions() throws HyracksDataException {
+ public synchronized Set<Integer> getAllPartitions() throws
HyracksDataException {
return
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
.collect(Collectors.toSet());
}
- public Optional<DatasetResourceReference> getLocalResourceReference(String
absoluteFilePath)
+ public synchronized Optional<DatasetResourceReference>
getLocalResourceReference(String absoluteFilePath)
throws HyracksDataException {
final String localResourcePath =
StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
final LocalResource lr = get(localResourcePath);
@@ -388,11 +406,12 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
* @return The set of indexes files
* @throws HyracksDataException
*/
- public Set<File> getPartitionIndexes(int partition) throws
HyracksDataException {
+ public synchronized Set<File> getPartitionIndexes(int partition) throws
HyracksDataException {
+ Path partitionRoot = getPartitionRoot(partition);
final Map<Long, LocalResource> partitionResourcesMap =
getResources(resource -> {
DatasetLocalResource dsResource = (DatasetLocalResource)
resource.getResource();
return dsResource.getPartition() == partition;
- });
+ }, Collections.singletonList(partitionRoot));
Set<File> indexes = new HashSet<>();
for (LocalResource localResource : partitionResourcesMap.values()) {
indexes.add(ioManager.resolve(localResource.getPath()).getFile());
@@ -400,14 +419,11 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return indexes;
}
- public Map<Long, LocalResource> getPartitionResources(int partition)
throws HyracksDataException {
- return getResources(resource -> {
- DatasetLocalResource dsResource = (DatasetLocalResource)
resource.getResource();
- return dsResource.getPartition() == partition;
- });
+ public synchronized Map<Long, LocalResource> getPartitionResources(int
partition) throws HyracksDataException {
+ return getResources(r -> true, Collections.singleton(partition));
}
- public Map<String, Long> getPartitionReplicatedResources(int partition,
IReplicationStrategy strategy)
+ public synchronized Map<String, Long> getPartitionReplicatedResources(int
partition, IReplicationStrategy strategy)
throws HyracksDataException {
final Map<String, Long> partitionReplicatedResources = new HashMap<>();
final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
@@ -421,7 +437,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return partitionReplicatedResources;
}
- public List<String> getPartitionReplicatedFiles(int partition,
IReplicationStrategy strategy)
+ public synchronized List<String> getPartitionReplicatedFiles(int
partition, IReplicationStrategy strategy)
throws HyracksDataException {
final List<String> partitionReplicatedFiles = new ArrayList<>();
final Set<File> replicatedIndexes = new HashSet<>();
@@ -438,7 +454,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return partitionReplicatedFiles;
}
- public long getReplicatedIndexesMaxComponentId(int partition,
IReplicationStrategy strategy)
+ public synchronized long getReplicatedIndexesMaxComponentId(int partition,
IReplicationStrategy strategy)
throws HyracksDataException {
long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
@@ -474,7 +490,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
}
- public void cleanup(int partition) throws HyracksDataException {
+ public synchronized void cleanup(int partition) throws
HyracksDataException {
final Set<File> partitionIndexes = getPartitionIndexes(partition);
try {
for (File index : partitionIndexes) {
@@ -501,7 +517,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return resourcesStats;
}
- public void deleteCorruptedResources() throws HyracksDataException {
+ public synchronized void deleteCorruptedResources() throws
HyracksDataException {
for (Path root : storageRoots) {
final Collection<File> metadataMaskFiles =
FileUtils.listFiles(root.toFile(),
METADATA_MASK_FILES_FILTER, ALL_DIR_FILTER);
@@ -601,12 +617,13 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return null;
}
- public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier) throws
HyracksDataException {
+ public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier,
Set<Integer> nodePartitions)
+ throws HyracksDataException {
long totalSize = 0;
final Map<Long, LocalResource> dataverse = getResources(lr -> {
final ResourceReference resourceReference =
ResourceReference.ofIndex(lr.getPath());
return datasetIdentifier.isMatch(resourceReference);
- });
+ }, nodePartitions);
final List<DatasetResourceReference> allResources =
dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
for (DatasetResourceReference res : allResources) {
@@ -644,11 +661,11 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return COMPONENT_FILES_FILTER.accept(indexDir, fileName);
}
- public Path[] getStorageRoots() {
+ public List<Path> getStorageRoots() {
return storageRoots;
}
- public void keepPartitions(Set<Integer> keepPartitions) {
+ public synchronized void keepPartitions(Set<Integer> keepPartitions) {
List<File> onDiskPartitions = getOnDiskPartitions();
for (File onDiskPartition : onDiskPartitions) {
int partitionNum =
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
@@ -660,7 +677,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
}
- public List<File> getOnDiskPartitions() {
+ public synchronized List<File> getOnDiskPartitions() {
List<File> onDiskPartitions = new ArrayList<>();
for (Path root : storageRoots) {
File[] partitions = root.toFile().listFiles(
@@ -671,4 +688,11 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
return onDiskPartitions;
}
+
+ public Path getPartitionRoot(int partition) throws HyracksDataException {
+ Path path =
+ Paths.get(StorageConstants.STORAGE_ROOT_DIR_NAME,
StorageConstants.PARTITION_DIR_PREFIX + partition);
+ FileReference resolve = ioManager.resolve(path.toString());
+ return resolve.getFile().toPath();
+ }
}