This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new fa2e379 [NO ISSUE][STO] Clean up invalid resources on bootstrap
fa2e379 is described below
commit fa2e379621640222f58902d6815c79307463d543
Author: Murtadha Hubail <[email protected]>
AuthorDate: Wed Aug 11 02:31:13 2021 +0300
[NO ISSUE][STO] Clean up invalid resources on bootstrap
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Add new node startup tasks to delete invalid resources.
- Ensure index max disk LSN is initialized during local
recovery even if the index is already registered.
Change-Id: Ia13182e96ec65951e837f71ffc4db3e92a43a7b0
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12766
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../org/apache/asterix/app/nc/RecoveryManager.java | 28 ++++++++---
.../org/apache/asterix/app/nc/ReplicaManager.java | 6 +--
.../app/nc/task/LocalStorageCleanupTask.java | 55 ++++++++++++++++++++++
.../app/replication/NcLifecycleCoordinator.java | 5 +-
.../PersistentLocalResourceRepository.java | 18 +++++++
5 files changed, 102 insertions(+), 10 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..ffde7d0 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
@Override
public void startLocalRecovery(Set<Integer> partitions) throws
IOException, ACIDException {
state = SystemState.RECOVERING;
- LOGGER.info("starting recovery ...");
+ LOGGER.info("starting recovery for partitions {}", partitions);
long readableSmallestLSN = logMgr.getReadableSmallestLSN();
Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -363,10 +363,8 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
try {
- final DatasetResourceReference
resourceReference =
-
DatasetResourceReference.of(localResource);
- maxDiskLastLsn =
-
indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+ maxDiskLastLsn =
getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
throw e;
@@ -374,7 +372,12 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
//#. set resourceId and maxDiskLastLSN to the
map
resourceId2MaxLSNMap.put(resourceId,
maxDiskLastLsn);
} else {
- maxDiskLastLsn =
resourceId2MaxLSNMap.get(resourceId);
+ if
(!resourceId2MaxLSNMap.containsKey(resourceId)) {
+ maxDiskLastLsn =
getResourceLowWaterMark(localResource, datasetLifecycleManager,
+ indexCheckpointManagerProvider);
+ } else {
+ maxDiskLastLsn =
resourceId2MaxLSNMap.get(resourceId);
+ }
}
// lsn @ maxDiskLastLsn is either a flush log or a
master replica log
if (lsn >= maxDiskLastLsn) {
@@ -858,6 +861,19 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
index.resetCurrentComponentIndex();
}
+ private long getResourceLowWaterMark(LocalResource localResource,
IDatasetLifecycleManager datasetLifecycleManager,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider)
throws HyracksDataException {
+ long maxDiskLastLsn;
+ try {
+ final DatasetResourceReference resourceReference =
DatasetResourceReference.of(localResource);
+ maxDiskLastLsn =
indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+ } catch (HyracksDataException e) {
+ datasetLifecycleManager.close(localResource.getPath());
+ throw e;
+ }
+ return maxDiskLastLsn;
+ }
+
private class JobEntityCommits {
private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
private final long txnId;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index dd1b6e7..655f9da 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -123,8 +123,6 @@ public class ReplicaManager implements IReplicaManager {
if (!partitions.contains(partition)) {
return;
}
- final IDatasetLifecycleManager datasetLifecycleManager =
appCtx.getDatasetLifecycleManager();
-
datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
closePartitionResources(partition);
final List<IPartitionReplica> partitionReplicas =
getReplicas(partition);
for (IPartitionReplica replica : partitionReplicas) {
@@ -139,10 +137,12 @@ public class ReplicaManager implements IReplicaManager {
}
public void closePartitionResources(int partition) throws
HyracksDataException {
+ final IDatasetLifecycleManager datasetLifecycleManager =
appCtx.getDatasetLifecycleManager();
+ //TODO(mhubail) we can flush only datasets of the requested partition
+ datasetLifecycleManager.flushAllDatasets();
final PersistentLocalResourceRepository resourceRepository =
(PersistentLocalResourceRepository)
appCtx.getLocalResourceRepository();
final Map<Long, LocalResource> partitionResources =
resourceRepository.getPartitionResources(partition);
- final IDatasetLifecycleManager datasetLifecycleManager =
appCtx.getDatasetLifecycleManager();
for (LocalResource resource : partitionResources.values()) {
datasetLifecycleManager.closeIfOpen(resource.getPath());
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
new file mode 100644
index 0000000..5471fb8
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the LocalStorageCleanupTask at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class LocalStorageCleanupTask implements INCLifecycleTask {
+
+ private static final long serialVersionUID = 1L;
+ private final int metadataPartitionId;
+
+ public LocalStorageCleanupTask(int metadataPartitionId) {
+ this.metadataPartitionId = metadataPartitionId;
+ }
+
+ @Override
+ public void perform(CcId ccId, IControllerService cs) throws
HyracksDataException {
+ INcApplicationContext appContext = (INcApplicationContext)
cs.getApplicationContext();
+ PersistentLocalResourceRepository localResourceRepository =
+ (PersistentLocalResourceRepository)
appContext.getLocalResourceRepository();
+ localResourceRepository.deleteInvalidIndexes(r -> {
+ DatasetLocalResource lr = (DatasetLocalResource) r.getResource();
+ return
MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
+ && lr.getPartition() != metadataPartitionId;
+ });
+ }
+
+ @Override
+ public String toString() {
+ return "LocalStorageCleanupTask{" + "metadataPartitionId=" +
metadataPartitionId + '}';
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index f164773..97e1a56 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -37,6 +37,7 @@ import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
import org.apache.asterix.app.nc.task.CheckpointTask;
import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
import org.apache.asterix.app.nc.task.RetrieveLibrariesTask;
import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
@@ -213,6 +214,8 @@ public class NcLifecycleCoordinator implements
INcLifecycleCoordinator {
Set<Integer> activePartitions) {
final List<INCLifecycleTask> tasks = new ArrayList<>();
tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
+ int metadataPartitionId =
clusterManager.getMetadataPartition().getPartitionId();
+ tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
if (state == SystemState.CORRUPTED) {
// need to perform local recovery for node active partitions
LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
@@ -222,7 +225,7 @@ public class NcLifecycleCoordinator implements
INcLifecycleCoordinator {
tasks.add(new StartReplicationServiceTask());
}
if (metadataNode) {
- tasks.add(new
MetadataBootstrapTask(clusterManager.getMetadataPartition().getPartitionId()));
+ tasks.add(new MetadataBootstrapTask(metadataPartitionId));
}
tasks.add(new CheckpointTask());
tasks.add(new StartLifecycleComponentsTask());
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 556ab6a..f753868 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -273,6 +273,24 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return resourcesMap;
}
+ public synchronized void deleteInvalidIndexes(Predicate<LocalResource>
filter) throws HyracksDataException {
+ for (Path root : storageRoots) {
+ final Collection<File> files = FileUtils.listFiles(root.toFile(),
METADATA_FILES_FILTER, ALL_DIR_FILTER);
+ try {
+ for (File file : files) {
+ final LocalResource localResource =
readLocalResource(file);
+ if (filter.test(localResource)) {
+ LOGGER.warn("deleting invalid metadata index {}",
file.getParentFile());
+ IoUtil.delete(file.getParentFile());
+ }
+ }
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ resourceCache.invalidateAll();
+ }
+
public Map<Long, LocalResource> loadAndGetAllResources() throws
HyracksDataException {
return getResources(p -> true);
}