This is an automated email from the ASF dual-hosted git repository. imaxon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 0e7e4bdb2c514fe7469a775c2ffc3d9e78e6a317 Author: Murtadha Hubail <[email protected]> AuthorDate: Wed May 12 16:41:02 2021 +0300 [NO ISSUE][STO] Close datasets of flushed indexes after recovery - user model changes: no - storage format changes: no - interface changes: yes Details: - After performing redo of a flush log on any index, close its dataset to ensure any cached state that might have been changed during recovery is cleared (e.g. the component id generator). - Fix LSMFlushRecoveryTest total number of records to be inserted. - Update LSMFlushRecoveryTest to check for duplicate component ids. Change-Id: I29072f475cc7b4d7d6efde415be0329fc568443e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/11423 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../org/apache/asterix/app/nc/RecoveryManager.java | 20 +++++++++++++------- .../asterix/test/dataflow/LSMFlushRecoveryTest.java | 12 ++++++++++-- .../asterix/common/api/IDatasetLifecycleManager.java | 9 +++++++++ .../common/context/DatasetLifecycleManager.java | 11 +++++++++++ .../common/transactions/IRecoveryManager.java | 11 ----------- 5 files changed, 43 insertions(+), 20 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 65cb36a..0359cf1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -173,15 +173,14 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { deleteRecoveryTemporaryFiles(); //get active partitions on this node - replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN); + replayPartitionsLogs(partitions, logMgr.getLogReader(true), lowWaterMarkLSN, true); } - @Override - public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) - throws IOException, ACIDException { + public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN, + boolean closeOnFlushRedo) throws IOException, ACIDException { try { Set<Long> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN); - startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet); + startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet, closeOnFlushRedo); } finally { logReader.close(); deleteRecoveryTemporaryFiles(); @@ -277,7 +276,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader, - long lowWaterMarkLSN, Set<Long> winnerTxnSet) throws IOException, ACIDException { + long lowWaterMarkLSN, Set<Long> winnerTxnSet, boolean closeOnFlushRedo) throws IOException, ACIDException { int redoCount = 0; long txnId = 0; @@ -299,6 +298,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { TxnEntityId tempKeyTxnEntityId = new TxnEntityId(-1, -1, -1, null, -1, false); ILogRecord logRecord = null; + Set<Integer> flushRedoDatasets = new HashSet<>(); try { logReader.setPosition(lowWaterMarkLSN); logRecord = logReader.next(); @@ -409,6 +409,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { && !index.isCurrentMutableComponentEmpty()) { // schedule flush redoFlush(index, logRecord); + flushRedoDatasets.add(datasetId); redoCount++; } else { // TODO: update checkpoint file? @@ -441,6 +442,11 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { for (long r : resourceIdList) { datasetLifecycleManager.close(resourcesMap.get(r).getPath()); } + if (closeOnFlushRedo) { + // close datasets of indexes to ensure any cached state that might've been changed by recovery is cleared + // e.g. when redoing a flush, the component id generator needs to be reinitialized + datasetLifecycleManager.closeDatasets(flushRedoDatasets); + } } } @@ -525,7 +531,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { if (minLSN < readableSmallestLSN) { minLSN = readableSmallestLSN; } - replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN); + replayPartitionsLogs(partitions, logMgr.getLogReader(true), minLSN, false); if (flush) { appCtx.getDatasetLifecycleManager().flushAllDatasets(); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java index 9c6e95e..2aad416 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LSMFlushRecoveryTest.java @@ -23,7 +23,9 @@ import java.lang.reflect.Field; import java.rmi.RemoteException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Semaphore; import org.apache.asterix.app.bootstrap.TestNodeController; @@ -62,6 +64,7 @@ import org.apache.hyracks.storage.am.lsm.btree.impl.AllowTestOpCallback; import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback; import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree; import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; @@ -143,8 +146,7 @@ public class LSMFlushRecoveryTest { checkComponentIds(); // insert more records createInsertOps(); - insertRecords(PARTITION_0, StorageTestUtils.RECORDS_PER_COMPONENT, StorageTestUtils.RECORDS_PER_COMPONENT, - true); + insertRecords(PARTITION_0, StorageTestUtils.TOTAL_NUM_OF_RECORDS, StorageTestUtils.RECORDS_PER_COMPONENT, true); dsInfo.waitForIO(); checkComponentIds(); @@ -487,8 +489,14 @@ public class LSMFlushRecoveryTest { List<ILSMDiskComponent> secondaryDiskComponents = secondaryIndexes[partitionIndex].getDiskComponents(); Assert.assertEquals(primaryDiskComponents.size(), secondaryDiskComponents.size()); + Set<ILSMComponentId> uniqueIds = new HashSet<>(); for (int i = 0; i < primaryDiskComponents.size(); i++) { Assert.assertEquals(primaryDiskComponents.get(i).getId(), secondaryDiskComponents.get(i).getId()); + ILSMComponentId id = primaryDiskComponents.get(i).getId(); + boolean added = uniqueIds.add(id); + if (!added) { + throw new IllegalStateException("found duplicate component ids: " + id); + } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java index 7b737a0..b03af55 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java @@ -19,6 +19,7 @@ package org.apache.asterix.common.api; import java.util.List; +import java.util.Set; import java.util.function.Predicate; import org.apache.asterix.common.context.DatasetInfo; @@ -124,6 +125,14 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd List<IVirtualBufferCache> getVirtualBufferCaches(int datasetId, int ioDeviceNum); /** + * Attempts to close the datasets in {@code datasetsToClose} + * + * @param datasetsToClose + * @throws HyracksDataException + */ + void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException; + + /** * Flushes then closes all open datasets */ void closeAllDatasets() throws HyracksDataException; diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index b26220d..b2f4034 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -25,6 +25,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; @@ -452,6 +453,16 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } @Override + public synchronized void closeDatasets(Set<Integer> datasetsToClose) throws HyracksDataException { + ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values()); + for (DatasetResource dsr : openDatasets) { + if (dsr.isOpen() && datasetsToClose.contains(dsr.getDatasetID())) { + closeDataset(dsr); + } + } + } + + @Override public synchronized void closeAllDatasets() throws HyracksDataException { ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values()); for (DatasetResource dsr : openDatasets) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java index bfe7963..8a5f34e 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManager.java @@ -85,17 +85,6 @@ public interface IRecoveryManager { long getLocalMinFirstLSN() throws HyracksDataException; /** - * Replay the logs that belong to the passed {@code partitions} starting from the {@code lowWaterMarkLSN} - * - * @param partitions - * @param lowWaterMarkLSN - * @throws IOException - * @throws ACIDException - */ - void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) - throws IOException, ACIDException; - - /** * Creates a temporary file to be used during recovery * * @param txnId
