This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit fe0f6fa181b03195cda8ff3585734ce10f466467 Author: Murtadha Hubail <[email protected]> AuthorDate: Thu May 21 21:29:46 2020 +0300 [ASTERIXDB-2738][STO] Create Mask File Before Merge Operations - user model changes: no - storage format changes: no - interface changes: no Details: - Before starting a merge operation, create a mask file (.mask_C_startSeq_endSeq) for the merged component to indicate that this component isn't valid yet. - On the merge operation successful completion, delete the merged component mask file. - In the case of any unexpected failure during the merge operation, all files of the failed merged component will be deleted on node startup/shutdown, including the mask file. - Halt on any IO opeartion failure. - Add a test case that ensures only masked merged components are deleted but not the original components that were supposed to be merged. Change-Id: I476dd3be5e75468e83044b3aaf0f6c2d8beadf1c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/6425 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Reviewed-by: Luo Chen <[email protected]> --- .../org/apache/asterix/app/nc/HaltCallback.java | 5 +- .../PersistentLocalResourceRepositoryTest.java | 53 ++++++++++++++++++++++ .../ioopcallbacks/LSMIOOperationCallback.java | 32 ++++++++++++- .../asterix/common/storage/ResourceReference.java | 14 ++++++ .../replication/messaging/ComponentMaskTask.java | 4 +- .../PersistentLocalResourceRepository.java | 14 +----- .../java/org/apache/hyracks/util/ExitUtil.java | 2 +- 7 files changed, 103 insertions(+), 21 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java index 17a4f46..9802001 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/HaltCallback.java @@ -20,7 +20,6 @@ package org.apache.asterix.app.nc; import org.apache.hyracks.storage.am.lsm.common.api.IIoOperationFailedCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.util.ExitUtil; import org.apache.logging.log4j.LogManager; @@ -42,8 +41,6 @@ public class HaltCallback implements IIoOperationFailedCallback { @Override public void operationFailed(ILSMIOOperation operation, Throwable t) { LOGGER.error("Operation {} has failed", operation, t); - if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) { - ExitUtil.halt(ExitUtil.EC_FLUSH_FAILED); - } + ExitUtil.halt(ExitUtil.EC_IO_OPERATION_FAILED); } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java index 00d2d3d..50118fc 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java @@ -171,6 +171,59 @@ public class PersistentLocalResourceRepositoryTest { Assert.assertFalse(indexMetadataMaskFile.exists()); } + @Test + public void deleteMaskedMergedFiles() throws Exception { + final INcApplicationContext ncAppCtx = (INcApplicationContext) integrationUtil.ncs[0].getApplicationContext(); + final String nodeId = ncAppCtx.getServiceContext().getNodeId(); + final String datasetName = "ds"; + TestDataUtil.createIdOnlyDataset(datasetName); + final Dataset dataset = TestDataUtil.getDataset(integrationUtil, datasetName); + final String indexPath = TestDataUtil.getIndexPath(integrationUtil, dataset, nodeId); + FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath); + int compSeqStart = 100; + int validComponentSequence = 103; + // advance valid component seq in checkpoint + PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) ncAppCtx.getLocalResourceRepository(); + LocalResource localResource = localResourceRepository.get(indexPath); + DatasetResourceReference drr = DatasetResourceReference.of(localResource); + IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ncAppCtx.getIndexCheckpointManagerProvider(); + IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(drr); + indexCheckpointManager.advanceValidComponentSequence(validComponentSequence); + // create components to be merged + String btree = "_b"; + String filter = "_f"; + String indexDir = indexDirRef.getFile().getAbsolutePath(); + for (int i = compSeqStart; i <= validComponentSequence; i++) { + String componentId = i + "_" + i; + Path btreePath = Paths.get(indexDir, componentId + btree); + Path filterPath = Paths.get(indexDir, componentId + filter); + Files.createFile(btreePath); + Files.createFile(filterPath); + } + // create masked merged component + String mergedComponentId = compSeqStart + "_" + validComponentSequence; + Path mergedBtreePath = Paths.get(indexDir, mergedComponentId + btree); + Path mergedFilterPath = Paths.get(indexDir, mergedComponentId + filter); + Path mergeMaskPath = Paths.get(indexDir, StorageConstants.COMPONENT_MASK_FILE_PREFIX + mergedComponentId); + Files.createFile(mergedBtreePath); + Files.createFile(mergedFilterPath); + Files.createFile(mergeMaskPath); + // cleanup storage and ensure merged component files were deleted while individual files still exist + DatasetLocalResource lr = (DatasetLocalResource) localResourceRepository.get(indexPath).getResource(); + localResourceRepository.cleanup(lr.getPartition()); + Assert.assertFalse(mergedBtreePath.toFile().exists()); + Assert.assertFalse(mergedFilterPath.toFile().exists()); + Assert.assertFalse(mergeMaskPath.toFile().exists()); + for (int i = compSeqStart; i <= validComponentSequence; i++) { + String componentId = i + "_" + i; + Path btreePath = Paths.get(indexDir, componentId + btree); + Path filterPath = Paths.get(indexDir, componentId + filter); + Assert.assertTrue(btreePath.toFile().exists()); + Assert.assertTrue(filterPath.toFile().exists()); + } + } + private void ensureInvalidComponentDeleted(String indexDir, String componentSeq, PersistentLocalResourceRepository localResourceRepository, DatasetLocalResource lr) throws IOException { Path btreePath = Paths.get(indexDir, diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java index 0aa46a8..c3737da 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java @@ -19,6 +19,10 @@ package org.apache.asterix.common.ioopcallbacks; +import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence; + +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayDeque; import java.util.Collection; import java.util.Deque; @@ -28,8 +32,10 @@ import java.util.Map; import org.apache.asterix.common.context.DatasetInfo; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.ResourceReference; +import org.apache.asterix.common.utils.StorageConstants; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.util.IoUtil; import org.apache.hyracks.data.std.primitive.LongPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.storage.am.common.api.IMetadataPageManager; @@ -83,7 +89,15 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { @Override public void beforeOperation(ILSMIOOperation operation) throws HyracksDataException { - // No Op + if (isMerge(operation)) { + FileReference operationMaskFilePath = getOperationMaskFilePath(operation); + // if a merge operation is attempted after a failure, its mask file may already exists + if (!operationMaskFilePath.getFile().exists()) { + IoUtil.create(operationMaskFilePath); + } else { + LOGGER.warn("merge operation mask file {} already exists", operationMaskFilePath); + } + } } @Override @@ -121,6 +135,8 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH || operation.getIOOpertionType() == LSMIOOperationType.LOAD) { addComponentToCheckpoint(operation); + } else if (isMerge(operation)) { + IoUtil.delete(getOperationMaskFilePath(operation)); } } @@ -277,4 +293,18 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { public void allocated(ILSMMemoryComponent component) throws HyracksDataException { // no op } + + private boolean isMerge(ILSMIOOperation operation) { + return operation.getIOOpertionType() == LSMIOOperationType.MERGE + && operation.getAccessor().getOpContext().getOperation() != IndexOperation.DELETE_COMPONENTS; + } + + private static FileReference getOperationMaskFilePath(ILSMIOOperation operation) { + FileReference target = operation.getTarget(); + final String componentSequence = getComponentSequence(target.getFile().getAbsolutePath()); + Path idxRelPath = Paths.get(target.getRelativePath()).getParent(); + Path maskFileRelPath = + Paths.get(idxRelPath.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence); + return new FileReference(target.getDeviceHandle(), maskFileRelPath.toString()); + } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java index ae949fe..a0de153 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java @@ -24,6 +24,7 @@ import java.nio.file.Paths; import org.apache.asterix.common.utils.StorageConstants; import org.apache.commons.lang3.StringUtils; +import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference; public class ResourceReference { @@ -130,4 +131,17 @@ public class ResourceReference { public String toString() { return getRelativePath().toString(); } + + /** + * Gets a component sequence based on its unique timestamp. + * e.g. a component file 1_3_b + * will return a component sequence 1_3 + * + * @param componentFile any component file + * @return The component sequence + */ + public static String getComponentSequence(String componentFile) { + final ResourceReference ref = of(componentFile); + return IndexComponentFileReference.of(ref.getName()).getSequence(); + } } diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java index 181a75b..3f04bd2 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java @@ -28,10 +28,10 @@ import java.nio.file.Paths; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.exceptions.ReplicationException; +import org.apache.asterix.common.storage.ResourceReference; import org.apache.asterix.common.utils.StorageConstants; import org.apache.asterix.replication.api.IReplicaTask; import org.apache.asterix.replication.api.IReplicationWorker; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; @@ -63,7 +63,7 @@ public class ComponentMaskTask implements IReplicaTask { final IIOManager ioManager = appCtx.getIoManager(); final FileReference localPath = ioManager.resolve(componentFile); final Path resourceDir = Files.createDirectories(localPath.getFile().getParentFile().toPath()); - final String componentSequence = PersistentLocalResourceRepository.getComponentSequence(componentFile); + final String componentSequence = ResourceReference.getComponentSequence(componentFile); return Paths.get(resourceDir.toString(), StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentSequence); } diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 31f5171..145be86 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.transaction.management.resource; +import static org.apache.asterix.common.storage.ResourceReference.getComponentSequence; import static org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX; import static org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME; import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE; @@ -593,19 +594,6 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return Paths.get(resourceFile.getFile().getParentFile().getAbsolutePath(), METADATA_FILE_MASK_NAME); } - /** - * Gets a component sequence based on its unique timestamp. - * e.g. a component file 1_3_b - * will return a component sequence 1_3 - * - * @param componentFile any component file - * @return The component sequence - */ - public static String getComponentSequence(String componentFile) { - final ResourceReference ref = ResourceReference.of(componentFile); - return IndexComponentFileReference.of(ref.getName()).getSequence(); - } - private static boolean isComponentMask(File mask) { return mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX); } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index f8bc9f9..4991f86 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -58,7 +58,7 @@ public class ExitUtil { public static final int EC_IO_SCHEDULER_FAILED = 55; public static final int EC_HALT_SHUTDOWN_TIMED_OUT = 66; public static final int EC_HALT_WATCHDOG_FAILED = 77; - public static final int EC_FLUSH_FAILED = 88; + public static final int EC_IO_OPERATION_FAILED = 88; public static final int EC_TERMINATE_NC_SERVICE_DIRECTIVE = 99; private static final ExitThread exitThread = new ExitThread(); private static final ShutdownWatchdog watchdogThread = new ShutdownWatchdog();
