This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8e51145806 [NO ISSUE][STO] Allow concurrent modification on persisted
resources
8e51145806 is described below
commit 8e51145806fb54ea8536487b484dd05049462f8d
Author: Murtadha Hubail <[email protected]>
AuthorDate: Fri Sep 29 22:09:59 2023 +0300
[NO ISSUE][STO] Allow concurrent modification on persisted resources
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- To allow index's resources to be created on different partitions
concurrently, replace the synchronization by read/write lock in
PersistentLocalResourceRepository.
- Any operation that might modify the persisted files will acquire
a read lock.
- Any operation that attempts to read the persisted files will acquire
a write lock to wait for any on-going modifications.
Change-Id: Id435bfc113a0b8e3e2a1f75712f0ded74ae0ee6f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17824
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../PersistentLocalResourceRepository.java | 502 +++++++++++++--------
.../storage/am/common/build/IndexBuilder.java | 82 ++--
2 files changed, 355 insertions(+), 229 deletions(-)
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 1eef182c06..33cbf2dc04 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -83,7 +84,6 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
private static final Logger LOGGER = LogManager.getLogger();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
private static final String METADATA_FILE_MASK_NAME =
StorageConstants.MASK_FILE_PREFIX +
StorageConstants.METADATA_FILE_NAME;
private static final FilenameFilter LSM_INDEX_FILES_FILTER =
@@ -94,7 +94,6 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
(dir, name) -> name.equals(StorageConstants.METADATA_FILE_NAME);
private static final FilenameFilter METADATA_MASK_FILES_FILTER =
(dir, name) -> name.equals(METADATA_FILE_MASK_NAME);
-
private static final int MAX_CACHED_RESOURCES = 1000;
// Finals
@@ -102,11 +101,11 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
private final Cache<String, LocalResource> resourceCache;
// Mutables
private boolean isReplicationEnabled = false;
- private Set<String> filesToBeReplicated;
private IReplicationManager replicationManager;
private final List<FileReference> storageRoots;
private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
private final IPersistedResourceRegistry persistedResourceRegistry;
+ private final ReentrantReadWriteLock resourcesAccessLock = new
ReentrantReadWriteLock(true);
public PersistentLocalResourceRepository(IIOManager ioManager,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
@@ -135,23 +134,29 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
@Override
- public synchronized LocalResource get(String relativePath) throws
HyracksDataException {
- LocalResource resource = resourceCache.getIfPresent(relativePath);
- if (resource == null) {
- FileReference resourceFile = getLocalResourceFileByName(ioManager,
relativePath);
- resource = readLocalResource(resourceFile);
- if (resource != null) {
- resourceCache.put(relativePath, resource);
+ public LocalResource get(String relativePath) throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ LocalResource resource = resourceCache.getIfPresent(relativePath);
+ if (resource == null) {
+ FileReference resourceFile =
getLocalResourceFileByName(ioManager, relativePath);
+ resource = readLocalResource(resourceFile);
+ if (resource != null) {
+ resourceCache.put(relativePath, resource);
+ }
}
+ return resource;
+ } finally {
+ afterReadAccess();
}
- return resource;
}
@SuppressWarnings("squid:S1181")
@Override
public void insert(LocalResource resource) throws HyracksDataException {
FileReference resourceFile;
- synchronized (this) {
+ beforeWriteAccess();
+ try {
String relativePath = getFileName(resource.getPath());
resourceFile = ioManager.resolve(relativePath);
if (resourceFile.getFile().exists()) {
@@ -178,6 +183,8 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
ExitUtil.halt(ExitUtil.EC_ERROR_CREATING_RESOURCES);
}
resourceCache.put(resource.getPath(), resource);
+ } finally {
+ afterWriteAccess();
}
// do not do the replication operation on the synchronized to avoid
blocking other threads
// on network operations
@@ -216,7 +223,8 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
LOGGER.error("failed to delete resource file {} from
replicas", resourceFile);
}
}
- synchronized (this) {
+ beforeWriteAccess();
+ try {
try {
if (resourceExists) {
ioManager.delete(resourceFile);
@@ -229,6 +237,8 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
} finally {
invalidateResource(relativePath);
}
+ } finally {
+ afterWriteAccess();
}
}
@@ -238,72 +248,101 @@ public class PersistentLocalResourceRepository
implements ILocalResourceReposito
return ioManager.resolve(fileName);
}
- public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter,
- List<FileReference> roots) throws HyracksDataException {
- Map<Long, LocalResource> resourcesMap = new HashMap<>();
- for (FileReference root : roots) {
- final Collection<FileReference> files = ioManager.list(root,
METADATA_FILES_FILTER);
- try {
- for (FileReference file : files) {
- final LocalResource localResource =
readLocalResource(file);
- if (localResource != null && filter.test(localResource)) {
- LocalResource duplicate =
resourcesMap.putIfAbsent(localResource.getId(), localResource);
- if (duplicate != null) {
- LOGGER.warn("found duplicate resource ids {} and
{}", localResource, duplicate);
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource>
filter, List<FileReference> roots)
+ throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ Map<Long, LocalResource> resourcesMap = new HashMap<>();
+ for (FileReference root : roots) {
+ final Collection<FileReference> files = ioManager.list(root,
METADATA_FILES_FILTER);
+ try {
+ for (FileReference file : files) {
+ final LocalResource localResource =
readLocalResource(file);
+ if (localResource != null &&
filter.test(localResource)) {
+ LocalResource duplicate =
resourcesMap.putIfAbsent(localResource.getId(), localResource);
+ if (duplicate != null) {
+ LOGGER.warn("found duplicate resource ids {}
and {}", localResource, duplicate);
+ }
}
}
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
- } catch (IOException e) {
- throw HyracksDataException.create(e);
}
+ return resourcesMap;
+ } finally {
+ afterReadAccess();
}
- return resourcesMap;
}
- public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter)
- throws HyracksDataException {
- return getResources(filter, storageRoots);
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource>
filter) throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ return getResources(filter, storageRoots);
+ } finally {
+ afterReadAccess();
+ }
}
- public synchronized Map<Long, LocalResource>
getResources(Predicate<LocalResource> filter, Set<Integer> partitions)
+ public Map<Long, LocalResource> getResources(Predicate<LocalResource>
filter, Set<Integer> partitions)
throws HyracksDataException {
- List<FileReference> partitionsRoots = new ArrayList<>();
- for (Integer partition : partitions) {
- partitionsRoots.add(getPartitionRoot(partition));
+ beforeReadAccess();
+ try {
+ List<FileReference> partitionsRoots = new ArrayList<>();
+ for (Integer partition : partitions) {
+ partitionsRoots.add(getPartitionRoot(partition));
+ }
+ return getResources(filter, partitionsRoots);
+ } finally {
+ afterReadAccess();
}
- return getResources(filter, partitionsRoots);
}
- public synchronized void deleteInvalidIndexes(Predicate<LocalResource>
filter) throws HyracksDataException {
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference root : storageRoots) {
- final Collection<FileReference> files = ioManager.list(root,
METADATA_FILES_FILTER);
- try {
- for (FileReference file : files) {
- final LocalResource localResource =
readLocalResource(file);
- if (localResource != null && filter.test(localResource)) {
- FileReference parent = file.getParent();
- LOGGER.warn("deleting invalid metadata index {}",
parent);
- bulkDelete.add(parent);
+ public void deleteInvalidIndexes(Predicate<LocalResource> filter) throws
HyracksDataException {
+ beforeReadAccess();
+ try {
+ IIOBulkOperation bulkDelete =
ioManager.createDeleteBulkOperation();
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> files = ioManager.list(root,
METADATA_FILES_FILTER);
+ try {
+ for (FileReference file : files) {
+ final LocalResource localResource =
readLocalResource(file);
+ if (localResource != null &&
filter.test(localResource)) {
+ FileReference parent = file.getParent();
+ LOGGER.warn("deleting invalid metadata index {}",
parent);
+ bulkDelete.add(parent);
+ }
}
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
- } catch (IOException e) {
- throw HyracksDataException.create(e);
}
+ ioManager.performBulkOperation(bulkDelete);
+ resourceCache.invalidateAll();
+ } finally {
+ afterReadAccess();
}
- ioManager.performBulkOperation(bulkDelete);
- resourceCache.invalidateAll();
}
public Map<Long, LocalResource> loadAndGetAllResources() throws
HyracksDataException {
- return getResources(p -> true);
+ beforeReadAccess();
+ try {
+ return getResources(p -> true);
+ } finally {
+ afterReadAccess();
+ }
}
@Override
- public synchronized long maxId() throws HyracksDataException {
- final Map<Long, LocalResource> allResources = loadAndGetAllResources();
- final Optional<Long> max =
allResources.keySet().stream().max(Long::compare);
- return max.isPresent() ? max.get() : 0;
+ public long maxId() throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ final Map<Long, LocalResource> allResources =
loadAndGetAllResources();
+ final Optional<Long> max =
allResources.keySet().stream().max(Long::compare);
+ return max.isPresent() ? max.get() : 0;
+ } finally {
+ afterReadAccess();
+ }
}
public void invalidateResource(String relativePath) {
@@ -320,39 +359,42 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
private LocalResource readLocalResource(FileReference fileRef) throws
HyracksDataException {
- byte[] bytes = ioManager.readAllBytes(fileRef);
- if (bytes == null) {
- return null;
- }
-
+ beforeReadAccess();
try {
- final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes,
JsonNode.class);
- LocalResource resource = (LocalResource)
persistedResourceRegistry.deserialize(jsonNode);
- if (resource.getVersion() == ITreeIndexFrame.Constants.VERSION) {
- return resource;
- } else {
- throw new AsterixException("Storage version mismatch.");
+ byte[] bytes = ioManager.readAllBytes(fileRef);
+ if (bytes == null) {
+ return null;
}
- } catch (Exception e) {
- throw HyracksDataException.create(e);
+ try {
+ final JsonNode jsonNode = OBJECT_MAPPER.readValue(bytes,
JsonNode.class);
+ LocalResource resource = (LocalResource)
persistedResourceRegistry.deserialize(jsonNode);
+ if (resource.getVersion() ==
ITreeIndexFrame.Constants.VERSION) {
+ return resource;
+ } else {
+ throw new AsterixException("Storage version mismatch.");
+ }
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ } finally {
+ afterReadAccess();
}
}
- public synchronized void setReplicationManager(IReplicationManager
replicationManager) {
- this.replicationManager = replicationManager;
- isReplicationEnabled = replicationManager.isReplicationEnabled();
-
- if (isReplicationEnabled) {
- filesToBeReplicated = new HashSet<>();
+ public void setReplicationManager(IReplicationManager replicationManager) {
+ beforeWriteAccess();
+ try {
+ this.replicationManager = replicationManager;
+ isReplicationEnabled = replicationManager.isReplicationEnabled();
+ } finally {
+ afterWriteAccess();
}
}
private void createReplicationJob(ReplicationOperation operation,
FileReference fileRef)
throws HyracksDataException {
- filesToBeReplicated.clear();
- filesToBeReplicated.add(fileRef.getAbsolutePath());
ReplicationJob job = new ReplicationJob(ReplicationJobType.METADATA,
operation, ReplicationExecutionType.SYNC,
- filesToBeReplicated);
+ Set.of(fileRef.getAbsolutePath()));
try {
replicationManager.submitJob(job);
} catch (IOException e) {
@@ -363,26 +405,41 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
/**
* Deletes physical files of all data verses.
*/
- public synchronized void deleteStorageData() throws HyracksDataException {
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference root : storageRoots) {
- bulkDelete.add(root);
+ public void deleteStorageData() throws HyracksDataException {
+ beforeWriteAccess();
+ try {
+ IIOBulkOperation bulkDelete =
ioManager.createDeleteBulkOperation();
+ for (FileReference root : storageRoots) {
+ bulkDelete.add(root);
+ }
+ ioManager.performBulkOperation(bulkDelete);
+ createStorageRoots();
+ } finally {
+ afterWriteAccess();
}
- ioManager.performBulkOperation(bulkDelete);
- createStorageRoots();
}
- public synchronized Set<Integer> getAllPartitions() throws
HyracksDataException {
- return
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
-
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
- .collect(Collectors.toSet());
+ public Set<Integer> getAllPartitions() throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ return
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
+
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
+ .collect(Collectors.toSet());
+ } finally {
+ afterReadAccess();
+ }
}
- public synchronized Optional<DatasetResourceReference>
getLocalResourceReference(String absoluteFilePath)
+ public Optional<DatasetResourceReference> getLocalResourceReference(String
absoluteFilePath)
throws HyracksDataException {
- final String localResourcePath =
StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
- final LocalResource lr = get(localResourcePath);
- return lr != null ? Optional.of(DatasetResourceReference.of(lr)) :
Optional.empty();
+ beforeReadAccess();
+ try {
+ final String localResourcePath =
StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
+ final LocalResource lr = get(localResourcePath);
+ return lr != null ? Optional.of(DatasetResourceReference.of(lr)) :
Optional.empty();
+ } finally {
+ afterReadAccess();
+ }
}
/**
@@ -393,67 +450,92 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
* @return The set of indexes files
* @throws HyracksDataException
*/
- public synchronized Set<FileReference> getPartitionIndexes(int partition)
throws HyracksDataException {
- FileReference partitionRoot = getPartitionRoot(partition);
- final Map<Long, LocalResource> partitionResourcesMap =
getResources(resource -> {
- DatasetLocalResource dsResource = (DatasetLocalResource)
resource.getResource();
- return dsResource.getPartition() == partition;
- }, Collections.singletonList(partitionRoot));
- Set<FileReference> indexes = new HashSet<>();
- for (LocalResource localResource : partitionResourcesMap.values()) {
- indexes.add(ioManager.resolve(localResource.getPath()));
+ public Set<FileReference> getPartitionIndexes(int partition) throws
HyracksDataException {
+ beforeReadAccess();
+ try {
+ FileReference partitionRoot = getPartitionRoot(partition);
+ final Map<Long, LocalResource> partitionResourcesMap =
getResources(resource -> {
+ DatasetLocalResource dsResource = (DatasetLocalResource)
resource.getResource();
+ return dsResource.getPartition() == partition;
+ }, Collections.singletonList(partitionRoot));
+ Set<FileReference> indexes = new HashSet<>();
+ for (LocalResource localResource : partitionResourcesMap.values())
{
+ indexes.add(ioManager.resolve(localResource.getPath()));
+ }
+ return indexes;
+ } finally {
+ afterReadAccess();
}
- return indexes;
}
- public synchronized Map<Long, LocalResource> getPartitionResources(int
partition) throws HyracksDataException {
- return getResources(r -> true, Collections.singleton(partition));
+ public Map<Long, LocalResource> getPartitionResources(int partition)
throws HyracksDataException {
+ beforeReadAccess();
+ try {
+ return getResources(r -> true, Collections.singleton(partition));
+ } finally {
+ afterReadAccess();
+ }
}
- public synchronized Map<String, Long> getPartitionReplicatedResources(int
partition, IReplicationStrategy strategy)
+ public Map<String, Long> getPartitionReplicatedResources(int partition,
IReplicationStrategy strategy)
throws HyracksDataException {
- final Map<String, Long> partitionReplicatedResources = new HashMap<>();
- final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
- for (LocalResource lr : partitionResources.values()) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource)
lr.getResource();
- if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
- DatasetResourceReference drr = DatasetResourceReference.of(lr);
-
partitionReplicatedResources.put(drr.getFileRelativePath().toString(),
lr.getId());
+ beforeReadAccess();
+ try {
+ final Map<String, Long> partitionReplicatedResources = new
HashMap<>();
+ final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource =
(DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ DatasetResourceReference drr =
DatasetResourceReference.of(lr);
+
partitionReplicatedResources.put(drr.getFileRelativePath().toString(),
lr.getId());
+ }
}
+ return partitionReplicatedResources;
+ } finally {
+ afterReadAccess();
}
- return partitionReplicatedResources;
}
- public synchronized List<String> getPartitionReplicatedFiles(int
partition, IReplicationStrategy strategy)
+ public List<String> getPartitionReplicatedFiles(int partition,
IReplicationStrategy strategy)
throws HyracksDataException {
- final List<String> partitionReplicatedFiles = new ArrayList<>();
- final Set<FileReference> replicatedIndexes = new HashSet<>();
- final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
- for (LocalResource lr : partitionResources.values()) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource)
lr.getResource();
- if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
- replicatedIndexes.add(ioManager.resolve(lr.getPath()));
+ beforeReadAccess();
+ try {
+ final List<String> partitionReplicatedFiles = new ArrayList<>();
+ final Set<FileReference> replicatedIndexes = new HashSet<>();
+ final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource =
(DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ replicatedIndexes.add(ioManager.resolve(lr.getPath()));
+ }
}
+ for (FileReference indexDir : replicatedIndexes) {
+ partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+ }
+ return partitionReplicatedFiles;
+ } finally {
+ afterReadAccess();
}
- for (FileReference indexDir : replicatedIndexes) {
- partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
- }
- return partitionReplicatedFiles;
}
- public synchronized long getReplicatedIndexesMaxComponentId(int partition,
IReplicationStrategy strategy)
+ public long getReplicatedIndexesMaxComponentId(int partition,
IReplicationStrategy strategy)
throws HyracksDataException {
- long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
- final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
- for (LocalResource lr : partitionResources.values()) {
- DatasetLocalResource datasetLocalResource = (DatasetLocalResource)
lr.getResource();
- if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
- final IIndexCheckpointManager indexCheckpointManager =
-
indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
- maxComponentId = Math.max(maxComponentId,
indexCheckpointManager.getLatest().getLastComponentId());
+ beforeReadAccess();
+ try {
+ long maxComponentId = LSMComponentId.MIN_VALID_COMPONENT_ID;
+ final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource =
(DatasetLocalResource) lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+ final IIndexCheckpointManager indexCheckpointManager =
+
indexCheckpointManagerProvider.get(DatasetResourceReference.of(lr));
+ maxComponentId = Math.max(maxComponentId,
indexCheckpointManager.getLatest().getLastComponentId());
+ }
}
+ return maxComponentId;
+ } finally {
+ afterReadAccess();
}
- return maxComponentId;
}
private List<String> getIndexFiles(FileReference indexDir) throws
HyracksDataException {
@@ -469,44 +551,59 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
}
- public synchronized void cleanup(int partition) throws
HyracksDataException {
- final Set<FileReference> partitionIndexes =
getPartitionIndexes(partition);
+ public void cleanup(int partition) throws HyracksDataException {
+ beforeReadAccess();
try {
- for (FileReference index : partitionIndexes) {
- deleteIndexMaskedFiles(index);
- if (isValidIndex(index)) {
- deleteIndexInvalidComponents(index);
+ final Set<FileReference> partitionIndexes =
getPartitionIndexes(partition);
+ try {
+ for (FileReference index : partitionIndexes) {
+ deleteIndexMaskedFiles(index);
+ if (isValidIndex(index)) {
+ deleteIndexInvalidComponents(index);
+ }
}
+ } catch (IOException | ParseException e) {
+ throw HyracksDataException.create(e);
}
- } catch (IOException | ParseException e) {
- throw HyracksDataException.create(e);
+ } finally {
+ afterReadAccess();
}
}
public List<ResourceStorageStats> getStorageStats() throws
HyracksDataException {
- final List<DatasetResourceReference> allResources =
loadAndGetAllResources().values().stream()
-
.map(DatasetResourceReference::of).collect(Collectors.toList());
- final List<ResourceStorageStats> resourcesStats = new ArrayList<>();
- for (DatasetResourceReference res : allResources) {
- final ResourceStorageStats resourceStats = getResourceStats(res);
- if (resourceStats != null) {
- resourcesStats.add(resourceStats);
+ beforeReadAccess();
+ try {
+ final List<DatasetResourceReference> allResources =
loadAndGetAllResources().values().stream()
+
.map(DatasetResourceReference::of).collect(Collectors.toList());
+ final List<ResourceStorageStats> resourcesStats = new
ArrayList<>();
+ for (DatasetResourceReference res : allResources) {
+ final ResourceStorageStats resourceStats =
getResourceStats(res);
+ if (resourceStats != null) {
+ resourcesStats.add(resourceStats);
+ }
}
+ return resourcesStats;
+ } finally {
+ afterReadAccess();
}
- return resourcesStats;
}
- public synchronized void deleteCorruptedResources() throws
HyracksDataException {
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference root : storageRoots) {
- final Collection<FileReference> metadataMaskFiles =
ioManager.list(root, METADATA_MASK_FILES_FILTER);
- for (FileReference metadataMaskFile : metadataMaskFiles) {
- final FileReference resourceFile =
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
- bulkDelete.add(resourceFile);
- bulkDelete.add(metadataMaskFile);
+ public void deleteCorruptedResources() throws HyracksDataException {
+ beforeWriteAccess();
+ try {
+ IIOBulkOperation bulkDelete =
ioManager.createDeleteBulkOperation();
+ for (FileReference root : storageRoots) {
+ final Collection<FileReference> metadataMaskFiles =
ioManager.list(root, METADATA_MASK_FILES_FILTER);
+ for (FileReference metadataMaskFile : metadataMaskFiles) {
+ final FileReference resourceFile =
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
+ bulkDelete.add(resourceFile);
+ bulkDelete.add(metadataMaskFile);
+ }
}
+ ioManager.performBulkOperation(bulkDelete);
+ } finally {
+ afterWriteAccess();
}
- ioManager.performBulkOperation(bulkDelete);
}
private void deleteIndexMaskedFiles(FileReference index) throws
IOException {
@@ -601,20 +698,25 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public long getDatasetSize(DatasetCopyIdentifier datasetIdentifier,
Set<Integer> nodePartitions)
throws HyracksDataException {
- long totalSize = 0;
- final Map<Long, LocalResource> dataverse = getResources(lr -> {
- final ResourceReference resourceReference =
ResourceReference.ofIndex(lr.getPath());
- return datasetIdentifier.isMatch(resourceReference);
- }, nodePartitions);
- final List<DatasetResourceReference> allResources =
-
dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
- for (DatasetResourceReference res : allResources) {
- final ResourceStorageStats resourceStats = getResourceStats(res);
- if (resourceStats != null) {
- totalSize += resourceStats.getTotalSize();
+ beforeReadAccess();
+ try {
+ long totalSize = 0;
+ final Map<Long, LocalResource> dataverse = getResources(lr -> {
+ final ResourceReference resourceReference =
ResourceReference.ofIndex(lr.getPath());
+ return datasetIdentifier.isMatch(resourceReference);
+ }, nodePartitions);
+ final List<DatasetResourceReference> allResources =
+
dataverse.values().stream().map(DatasetResourceReference::of).collect(Collectors.toList());
+ for (DatasetResourceReference res : allResources) {
+ final ResourceStorageStats resourceStats =
getResourceStats(res);
+ if (resourceStats != null) {
+ totalSize += resourceStats.getTotalSize();
+ }
}
+ return totalSize;
+ } finally {
+ afterReadAccess();
}
- return totalSize;
}
private void createResourceFileMask(FileReference resourceFile) throws
HyracksDataException {
@@ -644,13 +746,18 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
}
- public synchronized List<FileReference> getOnDiskPartitions() {
- List<FileReference> onDiskPartitions = new ArrayList<>();
- for (FileReference root : storageRoots) {
- onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir,
name) -> dir != null && dir.isDirectory()
- &&
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
+ public List<FileReference> getOnDiskPartitions() {
+ beforeReadAccess();
+ try {
+ List<FileReference> onDiskPartitions = new ArrayList<>();
+ for (FileReference root : storageRoots) {
+ onDiskPartitions.addAll(IoUtil.getMatchingChildren(root, (dir,
name) -> dir != null && dir.isDirectory()
+ &&
name.startsWith(StorageConstants.PARTITION_DIR_PREFIX)));
+ }
+ return onDiskPartitions;
+ } finally {
+ afterReadAccess();
}
- return onDiskPartitions;
}
public FileReference getPartitionRoot(int partition) throws
HyracksDataException {
@@ -660,16 +767,37 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
public void deletePartition(int partitionId) throws HyracksDataException {
- Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
- IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
- for (FileReference onDiskPartition : onDiskPartitions) {
- int partitionNum =
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
- if (partitionNum == partitionId) {
- LOGGER.warn("deleting partition {}", partitionNum);
- bulkDelete.add(onDiskPartition);
- break;
+ beforeReadAccess();
+ try {
+ Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+ IIOBulkOperation bulkDelete =
ioManager.createDeleteBulkOperation();
+ for (FileReference onDiskPartition : onDiskPartitions) {
+ int partitionNum =
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
+ if (partitionNum == partitionId) {
+ LOGGER.warn("deleting partition {}", partitionNum);
+ bulkDelete.add(onDiskPartition);
+ break;
+ }
}
+ ioManager.performBulkOperation(bulkDelete);
+ } finally {
+ afterReadAccess();
}
- ioManager.performBulkOperation(bulkDelete);
+ }
+
+ private void beforeWriteAccess() {
+ resourcesAccessLock.readLock().lock();
+ }
+
+ private void afterWriteAccess() {
+ resourcesAccessLock.readLock().unlock();
+ }
+
+ private void beforeReadAccess() {
+ resourcesAccessLock.writeLock().lock();
+ }
+
+ private void afterReadAccess() {
+ resourcesAccessLock.writeLock().unlock();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
index 1c0d7b49d5..8a67c71d09 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/build/IndexBuilder.java
@@ -68,49 +68,47 @@ public class IndexBuilder implements IIndexBuilder {
@Override
public void build() throws HyracksDataException {
IResourceLifecycleManager<IIndex> lcManager =
storageManager.getLifecycleManager(ctx);
- synchronized (lcManager) {
- // The previous resource Id needs to be removed since calling
IIndex.create() may possibly destroy any
- // physical artifact that the LocalResourceRepository is managing
(e.g. a file containing the resource Id).
- // Once the index has been created, a new resource Id can be
generated.
- ILocalResourceRepository localResourceRepository =
storageManager.getLocalResourceRepository(ctx);
- LocalResource lr = localResourceRepository.get(resourceRelPath);
- long resourceId = lr == null ? -1 : lr.getId();
- if (resourceId != -1) {
- localResourceRepository.delete(resourceRelPath);
- }
- resourceId = resourceIdFactory.createId();
- IResource resource =
localResourceFactory.createResource(resourceRef);
- lr = new LocalResource(resourceId,
ITreeIndexFrame.Constants.VERSION, durable, resource);
- IIndex index = lcManager.get(resourceRelPath);
- if (index != null) {
- //how is this right?????????? <needs to be fixed>
- //The reason for this is to handle many cases such as:
- //1. Crash while delete index is running (we don't do global
cleanup on restart)
- //2. Node leaves and then join with old data
- LOGGER.log(Level.WARN, "Removing existing index on index
create for the index: " + resourceRelPath);
- lcManager.unregister(resourceRelPath);
- index.destroy();
- } else {
- final FileReference resolvedResourceRef =
ctx.getIoManager().resolve(resourceRelPath);
- if (resolvedResourceRef.getFile().exists()) {
- // Index is not registered but the index file exists
- // This is another big problem that we need to disallow
soon
- // We can only disallow this if we have a global cleanup
after crash
- // on reboot
- LOGGER.warn(
- "Deleting {} on index create. The index is not
registered but the file exists in the filesystem",
- resolvedResourceRef);
- ctx.getIoManager().delete(resolvedResourceRef);
- }
- index = resource.createInstance(ctx);
- }
- index.create();
- try {
- localResourceRepository.insert(lr);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
+ // The previous resource Id needs to be removed since calling
IIndex.create() may possibly destroy any
+ // physical artifact that the LocalResourceRepository is managing
(e.g. a file containing the resource Id).
+ // Once the index has been created, a new resource Id can be generated.
+ ILocalResourceRepository localResourceRepository =
storageManager.getLocalResourceRepository(ctx);
+ LocalResource lr = localResourceRepository.get(resourceRelPath);
+ long resourceId = lr == null ? -1 : lr.getId();
+ if (resourceId != -1) {
+ localResourceRepository.delete(resourceRelPath);
+ }
+ resourceId = resourceIdFactory.createId();
+ IResource resource = localResourceFactory.createResource(resourceRef);
+ lr = new LocalResource(resourceId, ITreeIndexFrame.Constants.VERSION,
durable, resource);
+ IIndex index = lcManager.get(resourceRelPath);
+ if (index != null) {
+ //how is this right?????????? <needs to be fixed>
+ //The reason for this is to handle many cases such as:
+ //1. Crash while delete index is running (we don't do global
cleanup on restart)
+ //2. Node leaves and then join with old data
+ LOGGER.log(Level.WARN, "Removing existing index on index create
for the index: " + resourceRelPath);
+ lcManager.unregister(resourceRelPath);
+ index.destroy();
+ } else {
+ final FileReference resolvedResourceRef =
ctx.getIoManager().resolve(resourceRelPath);
+ if (resolvedResourceRef.getFile().exists()) {
+ // Index is not registered but the index file exists
+ // This is another big problem that we need to disallow soon
+ // We can only disallow this if we have a global cleanup after
crash
+ // on reboot
+ LOGGER.warn(
+ "Deleting {} on index create. The index is not
registered but the file exists in the filesystem",
+ resolvedResourceRef);
+ ctx.getIoManager().delete(resolvedResourceRef);
}
- lcManager.register(resourceRelPath, index);
+ index = resource.createInstance(ctx);
+ }
+ index.create();
+ try {
+ localResourceRepository.insert(lr);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
}
+ lcManager.register(resourceRelPath, index);
}
}