This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 662f8a942b082eef8c951bd86d0ee1c4cc2d25c8
Author: Murtadha Hubail <[email protected]>
AuthorDate: Mon Mar 14 23:20:08 2022 +0300

    [NO ISSUE][STO] Limit flushes to impacted partitions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - When requesting a flush, limit the indexes to be flushed
      to the impacted partitions.
    - Invalidate cached resources on replica promotion.
    - Invalidate cached resources on resource file deletion.
    
    Change-Id: I4c1408627c8e11240c3575c4b8f190d746588867
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15683
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
---
 .../org/apache/asterix/app/nc/RecoveryManager.java |  2 +-
 .../org/apache/asterix/app/nc/ReplicaManager.java  |  4 +--
 .../common/api/IDatasetLifecycleManager.java       | 19 +++++++-----
 .../common/context/DatasetLifecycleManager.java    | 36 ++++++++++++----------
 .../replication/messaging/DeleteFileTask.java      |  7 +++++
 .../replication/sync/ReplicaSynchronizer.java      |  3 +-
 .../PersistentLocalResourceRepository.java         |  4 +++
 7 files changed, 47 insertions(+), 28 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 6736642..2ca3fbc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -535,7 +535,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
             }
             replayPartitionsLogs(partitions, logMgr.getLogReader(true), 
minLSN, false);
             if (flush) {
-                appCtx.getDatasetLifecycleManager().flushAllDatasets();
+                
appCtx.getDatasetLifecycleManager().flushAllDatasets(partitions::contains);
             }
             cleanUp(partitions);
         } catch (IOException | ACIDException e) {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index a159e09..1372016 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -130,6 +130,7 @@ public class ReplicaManager implements IReplicaManager {
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
+        localResourceRepository.clearResourcesCache();
         final IRecoveryManager recoveryManager = 
appCtx.getTransactionSubsystem().getRecoveryManager();
         
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()),
 true);
         partitions.put(partition, new Object());
@@ -169,8 +170,7 @@ public class ReplicaManager implements IReplicaManager {
 
     public void closePartitionResources(int partition) throws 
HyracksDataException {
         final IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
-        //TODO(mhubail) we can flush only datasets of the requested partition
-        datasetLifecycleManager.flushAllDatasets();
+        datasetLifecycleManager.flushAllDatasets(p -> p == partition);
         final PersistentLocalResourceRepository resourceRepository =
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
         final Map<Long, LocalResource> partitionResources = 
resourceRepository.getPartitionResources(partition);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 44b83d4..1c2a047 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -20,6 +20,7 @@ package org.apache.asterix.common.api;
 
 import java.util.List;
 import java.util.Set;
+import java.util.function.IntPredicate;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.context.DatasetInfo;
@@ -61,6 +62,14 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
     void flushAllDatasets() throws HyracksDataException;
 
     /**
+     * Flushes all open datasets synchronously for partitions {@code 
partitions}
+     *
+     * @param partitions
+     * @throws HyracksDataException
+     */
+    void flushAllDatasets(IntPredicate partitions) throws HyracksDataException;
+
+    /**
      * Schedules asynchronous flush on indexes matching the predicate {@code 
indexPredicate}
      *
      * @param indexPredicate
@@ -143,19 +152,13 @@ public interface IDatasetLifecycleManager extends 
IResourceLifecycleManager<IInd
     List<IndexInfo> getOpenIndexesInfo();
 
     /**
-     * Flushes and closes all user datasets (non-metadata datasets)
-     *
-     * @throws HyracksDataException
-     */
-    void closeUserDatasets() throws HyracksDataException;
-
-    /**
      * Flushes all opened datasets that are matching {@code 
replicationStrategy}.
      *
      * @param replicationStrategy
+     * @param partitions
      * @throws HyracksDataException
      */
-    void flushDataset(IReplicationStrategy replicationStrategy) throws 
HyracksDataException;
+    void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate 
partitions) throws HyracksDataException;
 
     /**
      * Waits for all ongoing IO operations on all open datasets that are 
matching {@code replicationStrategy}.
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index c431dca..117b4fc 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntPredicate;
 import java.util.function.Predicate;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
@@ -362,9 +363,14 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
 
     @Override
     public synchronized void flushAllDatasets() throws HyracksDataException {
+        flushAllDatasets(partition -> true);
+    }
+
+    @Override
+    public synchronized void flushAllDatasets(IntPredicate partitions) throws 
HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.getDatasetInfo().isOpen()) {
-                flushDatasetOpenIndexes(dsr, false);
+                flushDatasetOpenIndexes(dsr, partitions, false);
             }
         }
     }
@@ -373,7 +379,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     public synchronized void flushDataset(int datasetId, boolean asyncFlush) 
throws HyracksDataException {
         DatasetResource dsr = datasets.get(datasetId);
         if (dsr != null) {
-            flushDatasetOpenIndexes(dsr, asyncFlush);
+            flushDatasetOpenIndexes(dsr, p -> true, asyncFlush);
         }
     }
 
@@ -407,7 +413,8 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
      * This method can only be called asynchronously safely if we're sure no 
modify operation
      * will take place until the flush is scheduled
      */
-    private void flushDatasetOpenIndexes(DatasetResource dsr, boolean 
asyncFlush) throws HyracksDataException {
+    private void flushDatasetOpenIndexes(DatasetResource dsr, IntPredicate 
partitions, boolean asyncFlush)
+            throws HyracksDataException {
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         if (!dsInfo.isOpen()) {
             throw new IllegalStateException("flushDatasetOpenIndexes is called 
on a dataset that is closed");
@@ -419,6 +426,9 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         // ensure all in-flight flushes gets scheduled
         logManager.log(waitLog);
         for (PrimaryIndexOperationTracker primaryOpTracker : 
dsr.getOpTrackers()) {
+            if (!partitions.test(primaryOpTracker.getPartition())) {
+                continue;
+            }
             // flush each partition one by one
             int numActiveOperations = 
primaryOpTracker.getNumActiveOperations();
             if (numActiveOperations > 0) {
@@ -433,6 +443,9 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         if (!asyncFlush) {
             List<FlushOperation> flushes = new ArrayList<>();
             for (PrimaryIndexOperationTracker primaryOpTracker : 
dsr.getOpTrackers()) {
+                if (!partitions.test(primaryOpTracker.getPartition())) {
+                    continue;
+                }
                 flushes.addAll(primaryOpTracker.getScheduledFlushes());
             }
             LSMIndexUtil.waitFor(flushes);
@@ -443,7 +456,7 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
         // First wait for any ongoing IO operations
         DatasetInfo dsInfo = dsr.getDatasetInfo();
         try {
-            flushDatasetOpenIndexes(dsr, false);
+            flushDatasetOpenIndexes(dsr, p -> true, false);
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
@@ -480,16 +493,6 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public synchronized void closeUserDatasets() throws HyracksDataException {
-        ArrayList<DatasetResource> openDatasets = new 
ArrayList<>(datasets.values());
-        for (DatasetResource dsr : openDatasets) {
-            if (!dsr.isMetadataDataset()) {
-                closeDataset(dsr);
-            }
-        }
-    }
-
-    @Override
     public synchronized void stop(boolean dumpState, OutputStream 
outputStream) throws IOException {
         if (stopped) {
             return;
@@ -539,10 +542,11 @@ public class DatasetLifecycleManager implements 
IDatasetLifecycleManager, ILifeC
     }
 
     @Override
-    public void flushDataset(IReplicationStrategy replicationStrategy) throws 
HyracksDataException {
+    public void flushDataset(IReplicationStrategy replicationStrategy, 
IntPredicate partitions)
+            throws HyracksDataException {
         for (DatasetResource dsr : datasets.values()) {
             if (dsr.isOpen() && 
replicationStrategy.isMatch(dsr.getDatasetID())) {
-                flushDatasetOpenIndexes(dsr, false);
+                flushDatasetOpenIndexes(dsr, partitions, false);
             }
         }
     }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
index 5eef84e..1e93228 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java
@@ -27,8 +27,10 @@ import java.nio.file.Files;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.replication.api.IReplicaTask;
 import org.apache.asterix.replication.api.IReplicationWorker;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.logging.log4j.LogManager;
@@ -53,6 +55,11 @@ public class DeleteFileTask implements IReplicaTask {
             final File localFile = ioManager.resolve(file).getFile();
             if (localFile.exists()) {
                 Files.delete(localFile.toPath());
+                ResourceReference replicaRes = 
ResourceReference.of(localFile.getAbsolutePath());
+                if (replicaRes.isMetadataResource()) {
+                    ((PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository())
+                            
.invalidateResource(replicaRes.getRelativePath().toString());
+                }
                 LOGGER.info(() -> "Deleted file: " + 
localFile.getAbsolutePath());
             } else {
                 LOGGER.warn(() -> "Requested to delete a non-existing file: " 
+ localFile.getAbsolutePath());
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index a8eee4f..0d0ef19 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -65,7 +65,8 @@ public class ReplicaSynchronizer {
         final ReplicaFilesSynchronizer fileSync = new 
ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery);
         // flush replicated dataset to generate disk component for any 
remaining in-memory components
         final IReplicationStrategy replStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
-        appCtx.getDatasetLifecycleManager().flushDataset(replStrategy);
+        appCtx.getDatasetLifecycleManager().flushDataset(replStrategy,
+                p -> p == replica.getIdentifier().getPartition());
         waitForReplicatedDatasetsIO();
         fileSync.sync();
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index cc396ad..27e753f 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -300,6 +300,10 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         resourceCache.invalidate(relativePath);
     }
 
+    public void clearResourcesCache() {
+        resourceCache.invalidateAll();
+    }
+
     private static String getFileName(String path) {
         return path.endsWith(File.separator) ? (path + 
StorageConstants.METADATA_FILE_NAME)
                 : (path + File.separator + 
StorageConstants.METADATA_FILE_NAME);

Reply via email to