Repository: asterixdb
Updated Branches:
  refs/heads/master ab1013cb9 -> 3316bde18


[ASTERIXDB-2195][REPL] Clean Masked Files

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Clean invalid masked files before promoting
  a partition or sending partition files list
  to master.
- Let replica calculate component id instead
  of sending it from master.
- Add tests for:
  - Deleting masked component.
  - Deleting masked file.

Change-Id: Ib0f0159159faf87b9f5fd2eca3956dd90633bcfa
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2268
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3316bde1
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3316bde1
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3316bde1

Branch: refs/heads/master
Commit: 3316bde186e6447c05a0aa1afcf5c82e5df83f4f
Parents: ab1013c
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Mon Jan 8 02:44:17 2018 +0300
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Mon Jan 8 10:03:28 2018 -0800

----------------------------------------------------------------------
 .../apache/asterix/app/nc/ReplicaManager.java   |  4 +
 .../org/apache/asterix/common/TestDataUtil.java | 10 ++
 .../PersistentLocalResourceRepositoryTest.java  | 99 ++++++++++++++++++++
 .../apache/asterix/test/txn/LogManagerTest.java | 10 +-
 .../asterix/common/utils/StorageConstants.java  |  1 +
 .../messaging/ComponentMaskTask.java            | 19 ++--
 .../messaging/PartitionResourcesListTask.java   |  2 +-
 .../replication/sync/IndexSynchronizer.java     | 13 +--
 .../PersistentLocalResourceRepository.java      | 63 +++++++++++++
 9 files changed, 188 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 155fa1d..4edae69 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -33,6 +33,7 @@ import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.replication.api.PartitionReplica;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class ReplicaManager implements IReplicaManager {
@@ -85,6 +86,9 @@ public class ReplicaManager implements IReplicaManager {
 
     @Override
     public synchronized void promote(int partition) throws 
HyracksDataException {
+        final PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
+        localResourceRepository.cleanup(partition);
         final IRecoveryManager recoveryManager = 
appCtx.getTransactionSubsystem().getRecoveryManager();
         
recoveryManager.replayReplicaPartitionLogs(Stream.of(partition).collect(Collectors.toSet()),
 true);
         partitions.add(partition);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
index 128aee6..6d114c6 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java
@@ -23,6 +23,7 @@ import java.rmi.RemoteException;
 import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
 import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -214,4 +215,13 @@ public class TestDataUtil {
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
         }
     }
+
+    public static String getIndexPath(AsterixHyracksIntegrationUtil 
integrationUtil, Dataset dataset, String nodeId)
+            throws Exception {
+        final FileSplit[] datasetSplits = 
TestDataUtil.getDatasetSplits(integrationUtil, dataset);
+        final Optional<FileSplit> nodeFileSplit =
+                Arrays.stream(datasetSplits).filter(s -> 
s.getNodeName().equals(nodeId)).findFirst();
+        Assert.assertTrue(nodeFileSplit.isPresent());
+        return nodeFileSplit.get().getPath();
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
new file mode 100644
index 0000000..6401d90
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/PersistentLocalResourceRepositoryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.storage;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
+import org.apache.asterix.common.TestDataUtil;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.asterix.metadata.entities.Dataset;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.io.FileReference;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PersistentLocalResourceRepositoryTest {
+
+    protected static final String TEST_CONFIG_FILE_NAME = 
"src/main/resources/cc.conf";
+    private static final AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
+
+    @Before
+    public void setUp() throws Exception {
+        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, 
TEST_CONFIG_FILE_NAME);
+        integrationUtil.init(true, TEST_CONFIG_FILE_NAME);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        integrationUtil.deinit(true);
+    }
+
+    @Test
+    public void deleteMaskedFiles() throws Exception {
+        final INcApplicationContext ncAppCtx = (INcApplicationContext) 
integrationUtil.ncs[0].getApplicationContext();
+        final String nodeId = ncAppCtx.getServiceContext().getNodeId();
+        final String datasetName = "ds";
+        TestDataUtil.createIdOnlyDataset(datasetName);
+        final Dataset dataset = TestDataUtil.getDataset(integrationUtil, 
datasetName);
+        final String indexPath = TestDataUtil.getIndexPath(integrationUtil, 
dataset, nodeId);
+        FileReference indexDirRef = ncAppCtx.getIoManager().resolve(indexPath);
+        // create masked component files
+        String indexDir = indexDirRef.getFile().getAbsolutePath();
+        String componentId = "12345_12345";
+        String btree = componentId + "_b";
+        String filter = componentId + "_f";
+        Path maskPath = Paths.get(indexDir, 
StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId);
+        Path btreePath = Paths.get(indexDir, btree);
+        Path filterPath = Paths.get(indexDir, filter);
+        Files.createFile(maskPath);
+        Files.createFile(btreePath);
+        Files.createFile(filterPath);
+        // clean up the dataset partition
+        PersistentLocalResourceRepository localResourceRepository =
+                (PersistentLocalResourceRepository) 
ncAppCtx.getLocalResourceRepository();
+        DatasetLocalResource lr = (DatasetLocalResource) 
localResourceRepository.get(indexPath).getResource();
+        int partition = lr.getPartition();
+        localResourceRepository.cleanup(partition);
+
+        // ensure all masked files and the mask were deleted
+        Assert.assertFalse(maskPath.toFile().exists());
+        Assert.assertFalse(btreePath.toFile().exists());
+        Assert.assertFalse(filterPath.toFile().exists());
+
+        // create single masked file
+        String fileName = "someFile";
+        maskPath = Paths.get(indexDir, StorageConstants.MASK_FILE_PREFIX + 
fileName);
+        Path filePath = Paths.get(indexDir, fileName);
+        Files.createFile(maskPath);
+        Files.createFile(filePath);
+        localResourceRepository.cleanup(partition);
+
+        // ensure the masked file and the mask were deleted
+        Assert.assertFalse(maskPath.toFile().exists());
+        Assert.assertFalse(filePath.toFile().exists());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index f43f3ff..b14d70b 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -79,7 +79,7 @@ public class LogManagerTest {
         final String datasetName = "ds";
         TestDataUtil.createIdOnlyDataset(datasetName);
         final Dataset dataset = TestDataUtil.getDataset(integrationUtil, 
datasetName);
-        final String indexPath = getIndexPath(dataset, nodeId);
+        final String indexPath = TestDataUtil.getIndexPath(integrationUtil, 
dataset, nodeId);
         final IDatasetLifecycleManager dclm = 
ncAppCtx.getDatasetLifecycleManager();
         dclm.open(indexPath);
         final ILSMIndex index = (ILSMIndex) dclm.get(indexPath);
@@ -185,14 +185,6 @@ public class LogManagerTest {
         interruptedLogPageSwitch();
     }
 
-    private static String getIndexPath(Dataset dataset, String nodeId) throws 
Exception {
-        final FileSplit[] datasetSplits = 
TestDataUtil.getDatasetSplits(integrationUtil, dataset);
-        final Optional<FileSplit> nodeFileSplit =
-                Arrays.stream(datasetSplits).filter(s -> 
s.getNodeName().equals(nodeId)).findFirst();
-        Assert.assertTrue(nodeFileSplit.isPresent());
-        return nodeFileSplit.get().getPath();
-    }
-
     private static ITransactionContext beingTransaction(INcApplicationContext 
ncAppCtx, ILSMIndex index,
             long resourceId) {
         final TxnId txnId = new TxnId(1);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index f59914d..265c9fd 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -36,6 +36,7 @@ public class StorageConstants {
     public static final String INDEX_CHECKPOINT_FILE_PREFIX = 
".idx_checkpoint_";
     public static final String METADATA_FILE_NAME = ".metadata";
     public static final String MASK_FILE_PREFIX = ".mask_";
+    public static final String COMPONENT_MASK_FILE_PREFIX = MASK_FILE_PREFIX + 
"C_";
     public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
 
     /**

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
index 26c9577..d5dc51d 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ComponentMaskTask.java
@@ -28,9 +28,10 @@ import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.replication.api.IReplicationWorker;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
+import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IIOManager;
@@ -40,13 +41,10 @@ import org.apache.hyracks.api.io.IIOManager;
  */
 public class ComponentMaskTask implements IReplicaTask {
 
-    private static final String COMPONENT_MASK_FILE_PREFIX = 
StorageConstants.MASK_FILE_PREFIX + "C_";
     private final String file;
-    private final String componentId;
 
-    public ComponentMaskTask(String file, String componentId) {
+    public ComponentMaskTask(String file) {
         this.file = file;
-        this.componentId = componentId;
     }
 
     @Override
@@ -61,11 +59,12 @@ public class ComponentMaskTask implements IReplicaTask {
         }
     }
 
-    public static Path getComponentMaskPath(INcApplicationContext appCtx, 
String file) throws IOException {
+    public static Path getComponentMaskPath(INcApplicationContext appCtx, 
String componentFile) throws IOException {
         final IIOManager ioManager = appCtx.getIoManager();
-        final FileReference localPath = ioManager.resolve(file);
+        final FileReference localPath = ioManager.resolve(componentFile);
         final Path resourceDir = 
Files.createDirectories(localPath.getFile().getParentFile().toPath());
-        return Paths.get(resourceDir.toString(), COMPONENT_MASK_FILE_PREFIX + 
localPath.getFile().getName());
+        final String componentId = 
PersistentLocalResourceRepository.getComponentId(componentFile);
+        return Paths.get(resourceDir.toString(), 
StorageConstants.COMPONENT_MASK_FILE_PREFIX + componentId);
     }
 
     @Override
@@ -78,7 +77,6 @@ public class ComponentMaskTask implements IReplicaTask {
         try {
             final DataOutputStream dos = new DataOutputStream(out);
             dos.writeUTF(file);
-            dos.writeUTF(componentId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -86,7 +84,6 @@ public class ComponentMaskTask implements IReplicaTask {
 
     public static ComponentMaskTask create(DataInput input) throws IOException 
{
         String indexFile = input.readUTF();
-        String componentId = input.readUTF();
-        return new ComponentMaskTask(indexFile, componentId);
+        return new ComponentMaskTask(indexFile);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index b972f32..54d3a02 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -45,9 +45,9 @@ public class PartitionResourcesListTask implements 
IReplicaTask {
 
     @Override
     public void perform(INcApplicationContext appCtx, IReplicationWorker 
worker) throws HyracksDataException {
-        //TODO delete any invalid files with masks
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
+        localResourceRepository.cleanup(partition);
         final List<String> partitionResources = 
localResourceRepository.getPartitionIndexesFiles(partition).stream()
                 
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
         final PartitionResourcesListResponse response =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 74f38e2..95ae690 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -22,11 +22,9 @@ import static 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationOper
 import static 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation.REPLICATE;
 
 import java.io.IOException;
-import java.nio.file.Paths;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.ComponentMaskTask;
@@ -39,7 +37,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
-import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -86,9 +83,8 @@ public class IndexSynchronizer {
     private void replicateComponent(PartitionReplica replica) throws 
IOException {
         // send component header
         final String anyFile = job.getAnyFile();
-        final String lsmComponentID = getComponentId(anyFile);
         final String indexFile = StoragePathUtil.getFileRelativePath(anyFile);
-        final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile, 
lsmComponentID);
+        final ComponentMaskTask maskTask = new ComponentMaskTask(indexFile);
         ReplicationProtocol.sendTo(replica, maskTask);
         ReplicationProtocol.waitForAck(replica);
         // send component files
@@ -130,11 +126,4 @@ public class IndexSynchronizer {
         return ((AbstractLSMIOOperationCallback) 
lsmIndex.getIOOperationCallback())
                 .getComponentLSN(ctx.getComponentsToBeReplicated());
     }
-
-    private static String getComponentId(String filePath) {
-        final ResourceReference ref = ResourceReference.of(filePath);
-        final String fileUniqueTimestamp =
-                ref.getName().substring(0, 
ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
-        return Paths.get(ref.getRelativePath().toString(), 
fileUniqueTimestamp).toString();
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3316bde1/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 54d6268..6ffeb28 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -49,6 +49,7 @@ import 
org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
@@ -62,8 +63,11 @@ import 
org.apache.hyracks.api.replication.IReplicationJob.ReplicationJobType;
 import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -71,8 +75,11 @@ import com.google.common.cache.CacheBuilder;
 public class PersistentLocalResourceRepository implements 
ILocalResourceRepository {
 
     public static final Predicate<Path> INDEX_COMPONENTS = path -> 
!path.endsWith(StorageConstants.METADATA_FILE_NAME);
+    private static final Logger LOGGER = LogManager.getLogger();
     private static final FilenameFilter LSM_INDEX_FILES_FILTER =
             (dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX);
+    private static final FilenameFilter MASK_FILES_FILTER =
+            (dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
     private static final int MAX_CACHED_RESOURCES = 1000;
     private static final IOFileFilter METADATA_FILES_FILTER = new 
IOFileFilter() {
         @Override
@@ -349,4 +356,60 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
             }
         }
     }
+
+    public void cleanup(int partition) throws HyracksDataException {
+        final Set<File> partitionIndexes = getPartitionIndexes(partition);
+        // find masks
+        for (File index : partitionIndexes) {
+            File[] masks = index.listFiles(MASK_FILES_FILTER);
+            if (masks != null) {
+                try {
+                    for (File mask : masks) {
+                        deleteIndexMaskedFiles(index, mask);
+                        // delete the mask itself
+                        Files.delete(mask.toPath());
+                    }
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+        }
+    }
+
+    private void deleteIndexMaskedFiles(File index, File mask) throws 
IOException {
+        if (!mask.getName().startsWith(StorageConstants.MASK_FILE_PREFIX)) {
+            throw new IllegalArgumentException("Unrecognized mask file: " + 
mask);
+        }
+        File[] maskedFiles;
+        if (isComponentMask(mask)) {
+            final String componentId = 
mask.getName().substring(StorageConstants.COMPONENT_MASK_FILE_PREFIX.length());
+            maskedFiles = index.listFiles((dir, name) -> 
name.startsWith(componentId));
+        } else {
+            final String maskedFileName = 
mask.getName().substring(StorageConstants.MASK_FILE_PREFIX.length());
+            maskedFiles = index.listFiles((dir, name) -> 
name.equals(maskedFileName));
+        }
+        if (maskedFiles != null) {
+            for (File maskedFile : maskedFiles) {
+                LOGGER.info(() -> "deleting masked file: " + 
maskedFile.getAbsolutePath());
+                Files.delete(maskedFile.toPath());
+            }
+        }
+    }
+
+    /**
+     * Gets a component id based on its unique timestamp.
+     * e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
+     * will return a component id 
2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
+     *
+     * @param componentFile any component file
+     * @return The component id
+     */
+    public static String getComponentId(String componentFile) {
+        final ResourceReference ref = ResourceReference.of(componentFile);
+        return ref.getName().substring(0, 
ref.getName().lastIndexOf(AbstractLSMIndexFileManager.DELIMITER));
+    }
+
+    private static boolean isComponentMask(File mask) {
+        return 
mask.getName().startsWith(StorageConstants.COMPONENT_MASK_FILE_PREFIX);
+    }
 }

Reply via email to