This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new fa2e379 [NO ISSUE][STO] Clean up invalid resources on bootstrap fa2e379 is described below commit fa2e379621640222f58902d6815c79307463d543 Author: Murtadha Hubail <murtadha.hub...@couchbase.com> AuthorDate: Wed Aug 11 02:31:13 2021 +0300 [NO ISSUE][STO] Clean up invalid resources on bootstrap - user model changes: no - storage format changes: no - interface changes: no Details: - Add new node startup tasks to delete invalid resources. - Ensure index max disk LSN is initialized during local recovery even if the index is already registered. Change-Id: Ia13182e96ec65951e837f71ffc4db3e92a43a7b0 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12766 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> --- .../org/apache/asterix/app/nc/RecoveryManager.java | 28 ++++++++--- .../org/apache/asterix/app/nc/ReplicaManager.java | 6 +-- .../app/nc/task/LocalStorageCleanupTask.java | 55 ++++++++++++++++++++++ .../app/replication/NcLifecycleCoordinator.java | 5 +- .../PersistentLocalResourceRepository.java | 18 +++++++ 5 files changed, 102 insertions(+), 10 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 0359cf1..ffde7d0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -160,7 +160,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { @Override public void startLocalRecovery(Set<Integer> partitions) throws IOException, ACIDException { state = SystemState.RECOVERING; - LOGGER.info("starting recovery ..."); + LOGGER.info("starting recovery for partitions {}", partitions); long readableSmallestLSN = logMgr.getReadableSmallestLSN(); Checkpoint checkpointObject = checkpointManager.getLatest(); @@ -363,10 +363,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { datasetLifecycleManager.register(localResource.getPath(), index); datasetLifecycleManager.open(localResource.getPath()); try { - final DatasetResourceReference resourceReference = - DatasetResourceReference.of(localResource); - maxDiskLastLsn = - indexCheckpointManagerProvider.get(resourceReference).getLowWatermark(); + maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager, + indexCheckpointManagerProvider); } catch (HyracksDataException e) { datasetLifecycleManager.close(localResource.getPath()); throw e; @@ -374,7 +372,12 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //#. set resourceId and maxDiskLastLSN to the map resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn); } else { - maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); + if (!resourceId2MaxLSNMap.containsKey(resourceId)) { + maxDiskLastLsn = getResourceLowWaterMark(localResource, datasetLifecycleManager, + indexCheckpointManagerProvider); + } else { + maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); + } } // lsn @ maxDiskLastLsn is either a flush log or a master replica log if (lsn >= maxDiskLastLsn) { @@ -858,6 +861,19 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { index.resetCurrentComponentIndex(); } + private long getResourceLowWaterMark(LocalResource localResource, IDatasetLifecycleManager datasetLifecycleManager, + IIndexCheckpointManagerProvider indexCheckpointManagerProvider) throws HyracksDataException { + long maxDiskLastLsn; + try { + final DatasetResourceReference resourceReference = DatasetResourceReference.of(localResource); + maxDiskLastLsn = indexCheckpointManagerProvider.get(resourceReference).getLowWatermark(); + } catch (HyracksDataException e) { + datasetLifecycleManager.close(localResource.getPath()); + throw e; + } + return maxDiskLastLsn; + } + private class JobEntityCommits { private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; private final long txnId; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java index dd1b6e7..655f9da 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java @@ -123,8 +123,6 @@ public class ReplicaManager implements IReplicaManager { if (!partitions.contains(partition)) { return; } - final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); - datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy()); closePartitionResources(partition); final List<IPartitionReplica> partitionReplicas = getReplicas(partition); for (IPartitionReplica replica : partitionReplicas) { @@ -139,10 +137,12 @@ public class ReplicaManager implements IReplicaManager { } public void closePartitionResources(int partition) throws HyracksDataException { + final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); + //TODO(mhubail) we can flush only datasets of the requested partition + datasetLifecycleManager.flushAllDatasets(); final PersistentLocalResourceRepository resourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); final Map<Long, LocalResource> partitionResources = resourceRepository.getPartitionResources(partition); - final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); for (LocalResource resource : partitionResources.values()) { datasetLifecycleManager.closeIfOpen(resource.getPath()); } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java new file mode 100644 index 0000000..5471fb8 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the LocalStorageCleanupTask at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc.task; + +import org.apache.asterix.common.api.INCLifecycleTask; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.dataflow.DatasetLocalResource; +import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.service.IControllerService; + +public class LocalStorageCleanupTask implements INCLifecycleTask { + + private static final long serialVersionUID = 1L; + private final int metadataPartitionId; + + public LocalStorageCleanupTask(int metadataPartitionId) { + this.metadataPartitionId = metadataPartitionId; + } + + @Override + public void perform(CcId ccId, IControllerService cs) throws HyracksDataException { + INcApplicationContext appContext = (INcApplicationContext) cs.getApplicationContext(); + PersistentLocalResourceRepository localResourceRepository = + (PersistentLocalResourceRepository) appContext.getLocalResourceRepository(); + localResourceRepository.deleteInvalidIndexes(r -> { + DatasetLocalResource lr = (DatasetLocalResource) r.getResource(); + return MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId()) + && lr.getPartition() != metadataPartitionId; + }); + } + + @Override + public String toString() { + return "LocalStorageCleanupTask{" + "metadataPartitionId=" + metadataPartitionId + '}'; + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java index f164773..97e1a56 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java @@ -37,6 +37,7 @@ import org.apache.asterix.app.nc.task.BindMetadataNodeTask; import org.apache.asterix.app.nc.task.CheckpointTask; import org.apache.asterix.app.nc.task.ExportMetadataNodeTask; import org.apache.asterix.app.nc.task.LocalRecoveryTask; +import org.apache.asterix.app.nc.task.LocalStorageCleanupTask; import org.apache.asterix.app.nc.task.MetadataBootstrapTask; import org.apache.asterix.app.nc.task.RetrieveLibrariesTask; import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask; @@ -213,6 +214,8 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { Set<Integer> activePartitions) { final List<INCLifecycleTask> tasks = new ArrayList<>(); tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING)); + int metadataPartitionId = clusterManager.getMetadataPartition().getPartitionId(); + tasks.add(new LocalStorageCleanupTask(metadataPartitionId)); if (state == SystemState.CORRUPTED) { // need to perform local recovery for node active partitions LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions); @@ -222,7 +225,7 @@ public class NcLifecycleCoordinator implements INcLifecycleCoordinator { tasks.add(new StartReplicationServiceTask()); } if (metadataNode) { - tasks.add(new MetadataBootstrapTask(clusterManager.getMetadataPartition().getPartitionId())); + tasks.add(new MetadataBootstrapTask(metadataPartitionId)); } tasks.add(new CheckpointTask()); tasks.add(new StartLifecycleComponentsTask()); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index 556ab6a..f753868 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -273,6 +273,24 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito return resourcesMap; } + public synchronized void deleteInvalidIndexes(Predicate<LocalResource> filter) throws HyracksDataException { + for (Path root : storageRoots) { + final Collection<File> files = FileUtils.listFiles(root.toFile(), METADATA_FILES_FILTER, ALL_DIR_FILTER); + try { + for (File file : files) { + final LocalResource localResource = readLocalResource(file); + if (filter.test(localResource)) { + LOGGER.warn("deleting invalid metadata index {}", file.getParentFile()); + IoUtil.delete(file.getParentFile()); + } + } + } catch (IOException e) { + throw HyracksDataException.create(e); + } + } + resourceCache.invalidateAll(); + } + public Map<Long, LocalResource> loadAndGetAllResources() throws HyracksDataException { return getResources(p -> true); }