This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fa2e379  [NO ISSUE][STO] Clean up invalid resources on bootstrap
fa2e379 is described below

commit fa2e379621640222f58902d6815c79307463d543
Author: Murtadha Hubail <murtadha.hub...@couchbase.com>
AuthorDate: Wed Aug 11 02:31:13 2021 +0300

    [NO ISSUE][STO] Clean up invalid resources on bootstrap
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Add new node startup tasks to delete invalid resources.
    - Ensure index max disk LSN is initialized during local
      recovery even if the index is already registered.
    
    Change-Id: Ia13182e96ec65951e837f71ffc4db3e92a43a7b0
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12766
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../org/apache/asterix/app/nc/RecoveryManager.java | 28 ++++++++---
 .../org/apache/asterix/app/nc/ReplicaManager.java  |  6 +--
 .../app/nc/task/LocalStorageCleanupTask.java       | 55 ++++++++++++++++++++++
 .../app/replication/NcLifecycleCoordinator.java    |  5 +-
 .../PersistentLocalResourceRepository.java         | 18 +++++++
 5 files changed, 102 insertions(+), 10 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 0359cf1..ffde7d0 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -160,7 +160,7 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     @Override
     public void startLocalRecovery(Set<Integer> partitions) throws 
IOException, ACIDException {
         state = SystemState.RECOVERING;
-        LOGGER.info("starting recovery ...");
+        LOGGER.info("starting recovery for partitions {}", partitions);
 
         long readableSmallestLSN = logMgr.getReadableSmallestLSN();
         Checkpoint checkpointObject = checkpointManager.getLatest();
@@ -363,10 +363,8 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
                                 
datasetLifecycleManager.register(localResource.getPath(), index);
                                 
datasetLifecycleManager.open(localResource.getPath());
                                 try {
-                                    final DatasetResourceReference 
resourceReference =
-                                            
DatasetResourceReference.of(localResource);
-                                    maxDiskLastLsn =
-                                            
indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+                                    maxDiskLastLsn = 
getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                            indexCheckpointManagerProvider);
                                 } catch (HyracksDataException e) {
                                     
datasetLifecycleManager.close(localResource.getPath());
                                     throw e;
@@ -374,7 +372,12 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
                                 //#. set resourceId and maxDiskLastLSN to the 
map
                                 resourceId2MaxLSNMap.put(resourceId, 
maxDiskLastLsn);
                             } else {
-                                maxDiskLastLsn = 
resourceId2MaxLSNMap.get(resourceId);
+                                if 
(!resourceId2MaxLSNMap.containsKey(resourceId)) {
+                                    maxDiskLastLsn = 
getResourceLowWaterMark(localResource, datasetLifecycleManager,
+                                            indexCheckpointManagerProvider);
+                                } else {
+                                    maxDiskLastLsn = 
resourceId2MaxLSNMap.get(resourceId);
+                                }
                             }
                             // lsn @ maxDiskLastLsn is either a flush log or a 
master replica log
                             if (lsn >= maxDiskLastLsn) {
@@ -858,6 +861,19 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         index.resetCurrentComponentIndex();
     }
 
+    private long getResourceLowWaterMark(LocalResource localResource, 
IDatasetLifecycleManager datasetLifecycleManager,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) 
throws HyracksDataException {
+        long maxDiskLastLsn;
+        try {
+            final DatasetResourceReference resourceReference = 
DatasetResourceReference.of(localResource);
+            maxDiskLastLsn = 
indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
+        } catch (HyracksDataException e) {
+            datasetLifecycleManager.close(localResource.getPath());
+            throw e;
+        }
+        return maxDiskLastLsn;
+    }
+
     private class JobEntityCommits {
         private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
         private final long txnId;
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index dd1b6e7..655f9da 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -123,8 +123,6 @@ public class ReplicaManager implements IReplicaManager {
         if (!partitions.contains(partition)) {
             return;
         }
-        final IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
-        
datasetLifecycleManager.flushDataset(appCtx.getReplicationManager().getReplicationStrategy());
         closePartitionResources(partition);
         final List<IPartitionReplica> partitionReplicas = 
getReplicas(partition);
         for (IPartitionReplica replica : partitionReplicas) {
@@ -139,10 +137,12 @@ public class ReplicaManager implements IReplicaManager {
     }
 
     public void closePartitionResources(int partition) throws 
HyracksDataException {
+        final IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
+        //TODO(mhubail) we can flush only datasets of the requested partition
+        datasetLifecycleManager.flushAllDatasets();
         final PersistentLocalResourceRepository resourceRepository =
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
         final Map<Long, LocalResource> partitionResources = 
resourceRepository.getPartitionResources(partition);
-        final IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
         for (LocalResource resource : partitionResources.values()) {
             datasetLifecycleManager.closeIfOpen(resource.getPath());
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
new file mode 100644
index 0000000..5471fb8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/task/LocalStorageCleanupTask.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the LocalStorageCleanupTask at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc.task;
+
+import org.apache.asterix.common.api.INCLifecycleTask;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.control.CcId;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.service.IControllerService;
+
+public class LocalStorageCleanupTask implements INCLifecycleTask {
+
+    private static final long serialVersionUID = 1L;
+    private final int metadataPartitionId;
+
+    public LocalStorageCleanupTask(int metadataPartitionId) {
+        this.metadataPartitionId = metadataPartitionId;
+    }
+
+    @Override
+    public void perform(CcId ccId, IControllerService cs) throws 
HyracksDataException {
+        INcApplicationContext appContext = (INcApplicationContext) 
cs.getApplicationContext();
+        PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) 
appContext.getLocalResourceRepository();
+        localResourceRepository.deleteInvalidIndexes(r -> {
+            DatasetLocalResource lr = (DatasetLocalResource) r.getResource();
+            return 
MetadataIndexImmutableProperties.isMetadataDataset(lr.getDatasetId())
+                    && lr.getPartition() != metadataPartitionId;
+        });
+    }
+
+    @Override
+    public String toString() {
+        return "LocalStorageCleanupTask{" + "metadataPartitionId=" + 
metadataPartitionId + '}';
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
index f164773..97e1a56 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/NcLifecycleCoordinator.java
@@ -37,6 +37,7 @@ import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
 import org.apache.asterix.app.nc.task.ExportMetadataNodeTask;
 import org.apache.asterix.app.nc.task.LocalRecoveryTask;
+import org.apache.asterix.app.nc.task.LocalStorageCleanupTask;
 import org.apache.asterix.app.nc.task.MetadataBootstrapTask;
 import org.apache.asterix.app.nc.task.RetrieveLibrariesTask;
 import org.apache.asterix.app.nc.task.StartLifecycleComponentsTask;
@@ -213,6 +214,8 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
             Set<Integer> activePartitions) {
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         tasks.add(new UpdateNodeStatusTask(NodeStatus.BOOTING));
+        int metadataPartitionId = 
clusterManager.getMetadataPartition().getPartitionId();
+        tasks.add(new LocalStorageCleanupTask(metadataPartitionId));
         if (state == SystemState.CORRUPTED) {
             // need to perform local recovery for node active partitions
             LocalRecoveryTask rt = new LocalRecoveryTask(activePartitions);
@@ -222,7 +225,7 @@ public class NcLifecycleCoordinator implements 
INcLifecycleCoordinator {
             tasks.add(new StartReplicationServiceTask());
         }
         if (metadataNode) {
-            tasks.add(new 
MetadataBootstrapTask(clusterManager.getMetadataPartition().getPartitionId()));
+            tasks.add(new MetadataBootstrapTask(metadataPartitionId));
         }
         tasks.add(new CheckpointTask());
         tasks.add(new StartLifecycleComponentsTask());
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 556ab6a..f753868 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -273,6 +273,24 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return resourcesMap;
     }
 
+    public synchronized void deleteInvalidIndexes(Predicate<LocalResource> 
filter) throws HyracksDataException {
+        for (Path root : storageRoots) {
+            final Collection<File> files = FileUtils.listFiles(root.toFile(), 
METADATA_FILES_FILTER, ALL_DIR_FILTER);
+            try {
+                for (File file : files) {
+                    final LocalResource localResource = 
readLocalResource(file);
+                    if (filter.test(localResource)) {
+                        LOGGER.warn("deleting invalid metadata index {}", 
file.getParentFile());
+                        IoUtil.delete(file.getParentFile());
+                    }
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+        resourceCache.invalidateAll();
+    }
+
     public Map<Long, LocalResource> loadAndGetAllResources() throws 
HyracksDataException {
         return getResources(p -> true);
     }

Reply via email to