This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 8882c2c8d948eefd2f740adf9c7019b8ee135656 Merge: e2fbccbd7a 5b5c7d76df Author: Wail Alkowaileet <[email protected]> AuthorDate: Thu Nov 9 13:59:28 2023 -0800 Merge branch 'trinity' into 'master' Change-Id: Ief24403066b4e9dbd1d1e72a5cdba6eab1338fdd .../main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java | 1 + .../management/resource/PersistentLocalResourceRepository.java | 8 ++++++++ 2 files changed, 9 insertions(+) diff --cc asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 96ae699588,a80545d9eb..39a88413de --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@@ -39,12 -42,14 +39,14 @@@ import java.util.List import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; -import java.util.stream.Stream; + import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.AsterixException; + import org.apache.asterix.common.replication.AllDatasetsReplicationStrategy; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.ReplicationJob; @@@ -101,11 -107,12 +103,12 @@@ public class PersistentLocalResourceRep private final Cache<String, LocalResource> resourceCache; // Mutables private boolean isReplicationEnabled = false; - private Set<String> filesToBeReplicated; private IReplicationManager replicationManager; - private final List<Path> storageRoots; + private final List<FileReference> storageRoots; private final IIndexCheckpointManagerProvider indexCheckpointManagerProvider; private final IPersistedResourceRegistry persistedResourceRegistry; + private final ReentrantReadWriteLock resourcesAccessLock = new ReentrantReadWriteLock(true); + private IDatasetLifecycleManager datasetLifecycleManager; public PersistentLocalResourceRepository(IIOManager ioManager, IIndexCheckpointManagerProvider indexCheckpointManagerProvider, @@@ -391,10 -349,16 +394,14 @@@ } } + public void setDatasetLifecycleManager(IDatasetLifecycleManager datasetLifecycleManager) { + this.datasetLifecycleManager = datasetLifecycleManager; + } + private void createReplicationJob(ReplicationOperation operation, FileReference fileRef) throws HyracksDataException { - filesToBeReplicated.clear(); - filesToBeReplicated.add(fileRef.getAbsolutePath()); ReplicationJob job = new ReplicationJob(ReplicationJobType.METADATA, operation, ReplicationExecutionType.SYNC, - filesToBeReplicated); + Set.of(fileRef.getAbsolutePath())); try { replicationManager.submitJob(job); } catch (IOException e) { @@@ -551,23 -486,18 +558,24 @@@ } } - public synchronized void cleanup(int partition) throws HyracksDataException { - datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, partition); - final Set<File> partitionIndexes = getPartitionIndexes(partition); + public void cleanup(int partition) throws HyracksDataException { + beforeReadAccess(); try { - for (File index : partitionIndexes) { - deleteIndexMaskedFiles(index); - if (isValidIndex(index)) { - deleteIndexInvalidComponents(index); ++ datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, partition); + final Set<FileReference> partitionIndexes = getPartitionIndexes(partition); + try { + for (FileReference index : partitionIndexes) { + deleteIndexMaskedFiles(index); + if (isValidIndex(index)) { + deleteIndexInvalidComponents(index); + } } + } catch (IOException | ParseException e) { + throw HyracksDataException.create(e); } - } catch (IOException | ParseException e) { - throw HyracksDataException.create(e); + } finally { + clearResourcesCache(); + afterReadAccess(); } }
