This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 662f8a942b082eef8c951bd86d0ee1c4cc2d25c8 Author: Murtadha Hubail <[email protected]> AuthorDate: Mon Mar 14 23:20:08 2022 +0300 [NO ISSUE][STO] Limit flushes to impacted partitions - user model changes: no - storage format changes: no - interface changes: yes Details: - When requesting a flush, limit the indexes to be flushed to the impacted partitions. - Invalidate cached resources on replica promotion. - Invalidate cached resources on resource file deletion. Change-Id: I4c1408627c8e11240c3575c4b8f190d746588867 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/15683 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> --- .../org/apache/asterix/app/nc/RecoveryManager.java | 2 +- .../org/apache/asterix/app/nc/ReplicaManager.java | 4 +-- .../common/api/IDatasetLifecycleManager.java | 19 +++++++----- .../common/context/DatasetLifecycleManager.java | 36 ++++++++++++---------- .../replication/messaging/DeleteFileTask.java | 7 +++++ .../replication/sync/ReplicaSynchronizer.java | 3 +- .../PersistentLocalResourceRepository.java | 4 +++ 7 files changed, 47 insertions(+), 28 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 6736642..2ca3fbc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -535,7 +535,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false); if (flush) { - appCtx.getDatasetLifecycleManager().flushAllDatasets(); + appCtx.getDatasetLifecycleManager().flushAllDatasets(partitions::contains); } cleanUp(partitions); } catch (IOException | ACIDException e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java index a159e09..1372016 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java @@ -130,6 +130,7 @@ public class ReplicaManager implements IReplicaManager { final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); localResourceRepository.cleanup(partition); + localResourceRepository.clearResourcesCache(); final IRecoveryManager recoveryManager = appCtx.getTransactionSubsystem().getRecoveryManager(); recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()), true); partitions.put(partition, new Object()); @@ -169,8 +170,7 @@ public class ReplicaManager implements IReplicaManager { public void closePartitionResources(int partition) throws HyracksDataException { final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); - //TODO(mhubail) we can flush only datasets of the requested partition - datasetLifecycleManager.flushAllDatasets(); + datasetLifecycleManager.flushAllDatasets(p -> p == partition); final PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 44b83d4..1c2a047 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -20,6 +20,7 @@ package org.apache.asterix.common.api; import java.util.List; import java.util.Set; +import java.util.function.IntPredicate; import java.util.function.Predicate; import org.apache.asterix.common.context.DatasetInfo; @@ -61,6 +62,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd void flushAllDatasets() throws HyracksDataException; /** + * Flushes all open datasets synchronously for partitions {@code partitions} + * + * @param partitions + * @throws HyracksDataException + */ + void flushAllDatasets(IntPredicate partitions) throws HyracksDataException; + + /** * Schedules asynchronous flush on indexes matching the predicate {@code indexPredicate} * * @param indexPredicate @@ -143,19 +152,13 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd List<IndexInfo> getOpenIndexesInfo(); /** - * Flushes and closes all user datasets (non-metadata datasets) - * - * @throws HyracksDataException - */ - void closeUserDatasets() throws HyracksDataException; - - /** * Flushes all opened datasets that are matching {@code replicationStrategy}. * * @param replicationStrategy + * @param partitions * @throws HyracksDataException */ - void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException; + void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) throws HyracksDataException; /** * Waits for all ongoing IO operations on all open datasets that are matching {@code replicationStrategy}. diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index c431dca..117b4fc 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.IntPredicate; import java.util.function.Predicate; import org.apache.asterix.common.api.IDatasetLifecycleManager; @@ -362,9 +363,14 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC @Override public synchronized void flushAllDatasets() throws HyracksDataException { + flushAllDatasets(partition -> true); + } + + @Override + public synchronized void flushAllDatasets(IntPredicate partitions) throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { if (dsr.getDatasetInfo().isOpen()) { - flushDatasetOpenIndexes(dsr, false); + flushDatasetOpenIndexes(dsr, partitions, false); } } } @@ -373,7 +379,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException { DatasetResource dsr = datasets.get(datasetId); if (dsr != null) { - flushDatasetOpenIndexes(dsr, asyncFlush); + flushDatasetOpenIndexes(dsr, p -> true, asyncFlush); } } @@ -407,7 +413,8 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC * This method can only be called asynchronously safely if we're sure no modify operation * will take place until the flush is scheduled */ - private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException { + private void flushDatasetOpenIndexes(DatasetResource dsr, IntPredicate partitions, boolean asyncFlush) + throws HyracksDataException { DatasetInfo dsInfo = dsr.getDatasetInfo(); if (!dsInfo.isOpen()) { throw new IllegalStateException("flushDatasetOpenIndexes is called on a dataset that is closed"); @@ -419,6 +426,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC // ensure all in-flight flushes gets scheduled logManager.log(waitLog); for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) { + if (!partitions.test(primaryOpTracker.getPartition())) { + continue; + } // flush each partition one by one int numActiveOperations = primaryOpTracker.getNumActiveOperations(); if (numActiveOperations > 0) { @@ -433,6 +443,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC if (!asyncFlush) { List<FlushOperation> flushes = new ArrayList<>(); for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) { + if (!partitions.test(primaryOpTracker.getPartition())) { + continue; + } flushes.addAll(primaryOpTracker.getScheduledFlushes()); } LSMIndexUtil.waitFor(flushes); @@ -443,7 +456,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC // First wait for any ongoing IO operations DatasetInfo dsInfo = dsr.getDatasetInfo(); try { - flushDatasetOpenIndexes(dsr, false); + flushDatasetOpenIndexes(dsr, p -> true, false); } catch (Exception e) { throw HyracksDataException.create(e); } @@ -480,16 +493,6 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } @Override - public synchronized void closeUserDatasets() throws HyracksDataException { - ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values()); - for (DatasetResource dsr : openDatasets) { - if (!dsr.isMetadataDataset()) { - closeDataset(dsr); - } - } - } - - @Override public synchronized void stop(boolean dumpState, OutputStream outputStream) throws IOException { if (stopped) { return; @@ -539,10 +542,11 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } @Override - public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException { + public void flushDataset(IReplicationStrategy replicationStrategy, IntPredicate partitions) + throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { if (dsr.isOpen() && replicationStrategy.isMatch(dsr.getDatasetID())) { - flushDatasetOpenIndexes(dsr, false); + flushDatasetOpenIndexes(dsr, partitions, false); } } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java index 5eef84e..1e93228 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DeleteFileTask.java @@ -27,8 +27,10 @@ import java.nio.file.Files; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.replication.api.IReplicaTask; import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.IIOManager; import org.apache.logging.log4j.LogManager; @@ -53,6 +55,11 @@ public class DeleteFileTask implements IReplicaTask { final File localFile = ioManager.resolve(file).getFile(); if (localFile.exists()) { Files.delete(localFile.toPath()); + ResourceReference replicaRes = ResourceReference.of(localFile.getAbsolutePath()); + if (replicaRes.isMetadataResource()) { + ((PersistentLocalResourceRepository) appCtx.getLocalResourceRepository()) + .invalidateResource(replicaRes.getRelativePath().toString()); + } LOGGER.info(() -> "Deleted file: " + localFile.getAbsolutePath()); } else { LOGGER.warn(() -> "Requested to delete a non-existing file: " + localFile.getAbsolutePath()); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java index a8eee4f..0d0ef19 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java @@ -65,7 +65,8 @@ public class ReplicaSynchronizer { final ReplicaFilesSynchronizer fileSync = new ReplicaFilesSynchronizer(appCtx, replica, deltaRecovery); // flush replicated dataset to generate disk component for any remaining in-memory components final IReplicationStrategy replStrategy = appCtx.getReplicationManager().getReplicationStrategy(); - appCtx.getDatasetLifecycleManager().flushDataset(replStrategy); + appCtx.getDatasetLifecycleManager().flushDataset(replStrategy, + p -> p == replica.getIdentifier().getPartition()); waitForReplicatedDatasetsIO(); fileSync.sync(); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index cc396ad..27e753f 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -300,6 +300,10 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito resourceCache.invalidate(relativePath); } + public void clearResourcesCache() { + resourceCache.invalidateAll(); + } + private static String getFileName(String path) { return path.endsWith(File.separator) ? (path + StorageConstants.METADATA_FILE_NAME) : (path + File.separator + StorageConstants.METADATA_FILE_NAME);
