This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 8882c2c8d948eefd2f740adf9c7019b8ee135656
Merge: e2fbccbd7a 5b5c7d76df
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Thu Nov 9 13:59:28 2023 -0800

    Merge branch 'trinity' into 'master'
    
    Change-Id: Ief24403066b4e9dbd1d1e72a5cdba6eab1338fdd

 .../main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java  | 1 +
 .../management/resource/PersistentLocalResourceRepository.java    | 8 ++++++++
 2 files changed, 9 insertions(+)

diff --cc 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 96ae699588,a80545d9eb..39a88413de
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@@ -39,12 -42,14 +39,14 @@@ import java.util.List
  import java.util.Map;
  import java.util.Optional;
  import java.util.Set;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
  import java.util.function.Predicate;
  import java.util.stream.Collectors;
 -import java.util.stream.Stream;
  
+ import org.apache.asterix.common.api.IDatasetLifecycleManager;
  import org.apache.asterix.common.dataflow.DatasetLocalResource;
  import org.apache.asterix.common.exceptions.AsterixException;
+ import org.apache.asterix.common.replication.AllDatasetsReplicationStrategy;
  import org.apache.asterix.common.replication.IReplicationManager;
  import org.apache.asterix.common.replication.IReplicationStrategy;
  import org.apache.asterix.common.replication.ReplicationJob;
@@@ -101,11 -107,12 +103,12 @@@ public class PersistentLocalResourceRep
      private final Cache<String, LocalResource> resourceCache;
      // Mutables
      private boolean isReplicationEnabled = false;
 -    private Set<String> filesToBeReplicated;
      private IReplicationManager replicationManager;
 -    private final List<Path> storageRoots;
 +    private final List<FileReference> storageRoots;
      private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
      private final IPersistedResourceRegistry persistedResourceRegistry;
 +    private final ReentrantReadWriteLock resourcesAccessLock = new 
ReentrantReadWriteLock(true);
+     private IDatasetLifecycleManager datasetLifecycleManager;
  
      public PersistentLocalResourceRepository(IIOManager ioManager,
              IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
@@@ -391,10 -349,16 +394,14 @@@
          }
      }
  
+     public void setDatasetLifecycleManager(IDatasetLifecycleManager 
datasetLifecycleManager) {
+         this.datasetLifecycleManager = datasetLifecycleManager;
+     }
+ 
      private void createReplicationJob(ReplicationOperation operation, 
FileReference fileRef)
              throws HyracksDataException {
 -        filesToBeReplicated.clear();
 -        filesToBeReplicated.add(fileRef.getAbsolutePath());
          ReplicationJob job = new ReplicationJob(ReplicationJobType.METADATA, 
operation, ReplicationExecutionType.SYNC,
 -                filesToBeReplicated);
 +                Set.of(fileRef.getAbsolutePath()));
          try {
              replicationManager.submitJob(job);
          } catch (IOException e) {
@@@ -551,23 -486,18 +558,24 @@@
          }
      }
  
 -    public synchronized void cleanup(int partition) throws 
HyracksDataException {
 -        
datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, 
partition);
 -        final Set<File> partitionIndexes = getPartitionIndexes(partition);
 +    public void cleanup(int partition) throws HyracksDataException {
 +        beforeReadAccess();
          try {
 -            for (File index : partitionIndexes) {
 -                deleteIndexMaskedFiles(index);
 -                if (isValidIndex(index)) {
 -                    deleteIndexInvalidComponents(index);
++            
datasetLifecycleManager.waitForIO(AllDatasetsReplicationStrategy.INSTANCE, 
partition);
 +            final Set<FileReference> partitionIndexes = 
getPartitionIndexes(partition);
 +            try {
 +                for (FileReference index : partitionIndexes) {
 +                    deleteIndexMaskedFiles(index);
 +                    if (isValidIndex(index)) {
 +                        deleteIndexInvalidComponents(index);
 +                    }
                  }
 +            } catch (IOException | ParseException e) {
 +                throw HyracksDataException.create(e);
              }
 -        } catch (IOException | ParseException e) {
 -            throw HyracksDataException.create(e);
 +        } finally {
 +            clearResourcesCache();
 +            afterReadAccess();
          }
      }
  

Reply via email to