This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e9fdae7a [ASTERIXDB-3218][*DB] Support bulk deletes of files
c3e9fdae7a is described below

commit c3e9fdae7a696f90e71c57abfbe5383c1b1c4398
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Wed Jul 12 16:21:09 2023 -0700

    [ASTERIXDB-3218][*DB] Support bulk deletes of files
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Extend IIOManager to bulk-delete mutliple files.
    - Extract list from S3CloudClient#deleteObjects()
      - Only do list when deleting uncahced
        files or when deleting directories.
    - Unify IIOManager#delete() to delete files
      and directories
    - Make S3CloudClient#deleteObjects() to delete
      1000 files per request -- an S3 limit
    - Other misc. optimizations
    
    Change-Id: Ic53a45ef4cc0911ee2b07c73c267d492600bc69f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17630
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../asterix/app/nc/IndexCheckpointManager.java     |  10 +-
 .../asterix/cloud/AbstractCloudIOManager.java      |  29 ++--
 .../cloud/bulk/DeleteBulkCloudOperation.java       |  52 ++++++
 .../apache/asterix/cloud/clients/ICloudClient.java |  31 ++--
 .../cloud/clients/aws/s3/S3CloudClient.java        |  27 ++-
 .../test/java/org/apach/asterix/cloud/LSMTest.java |   3 +-
 .../replication/messaging/DropIndexTask.java       |   2 +-
 .../PersistentLocalResourceRepository.java         |  45 ++---
 .../apache/hyracks/api/io/IIOBulkOperation.java    |  33 ++++
 .../java/org/apache/hyracks/api/io/IIOManager.java | 190 ++++++++++++++++++---
 .../apache/hyracks/control/nc/io/IOManager.java    |  22 ++-
 .../control/nc/io/bulk/AbstractBulkOperation.java  |  44 +++++
 .../control/nc/io/bulk/DeleteBulkOperation.java    |  36 ++++
 .../common/impls/AbstractLSMIndexFileManager.java  |   2 +-
 14 files changed, 430 insertions(+), 96 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 9383f063ff..2e43397c03 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -31,6 +31,7 @@ import org.apache.asterix.common.storage.IndexCheckpoint;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.util.IoUtil;
 import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
@@ -201,10 +202,7 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         final FileReference checkpointPath = getCheckpointPath(checkpoint);
         for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
             try {
-                // clean up from previous write failure
-                if (ioManager.exists(checkpointPath)) {
-                    ioManager.delete(checkpointPath);
-                }
+                // Overwrite will clean up from previous write failure (if any)
                 ioManager.overwrite(checkpointPath, 
checkpoint.asJson().getBytes());
                 // ensure it was written correctly by reading it
                 read(checkpointPath);
@@ -247,11 +245,13 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         try {
             final Collection<FileReference> checkpointFiles = 
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
             if (!checkpointFiles.isEmpty()) {
+                IIOBulkOperation deleteBulk = 
ioManager.createDeleteBulkOperation();
                 for (FileReference checkpointFile : checkpointFiles) {
                     if (getCheckpointIdFromFileName(checkpointFile) < 
(latestId - historyToKeep)) {
-                        ioManager.delete(checkpointFile);
+                        deleteBulk.add(checkpointFile);
                     }
                 }
+                ioManager.performBulkOperation(deleteBulk);
             }
         } catch (Exception e) {
             LOGGER.warn(() -> "Couldn't delete history checkpoints at " + 
indexPath, e);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 1c069199b3..49f71f49d2 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -26,10 +26,13 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
+import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
 import org.apache.asterix.cloud.clients.CloudClientProvider;
 import org.apache.asterix.cloud.clients.ICloudClient;
 import org.apache.asterix.cloud.util.CloudFileUtil;
@@ -40,6 +43,7 @@ import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IOManager;
 import org.apache.hyracks.util.file.FileUtil;
@@ -117,7 +121,7 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
             if (!partitions.contains(partitionNum)) {
                 LOGGER.warn("Deleting storage partition {} as it does not 
belong to the current storage partitions {}",
                         partitionNum, partitions);
-                localIoManager.deleteDirectory(partitionDir);
+                localIoManager.delete(partitionDir);
             }
         }
     }
@@ -201,13 +205,23 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
 
     @Override
     public final void delete(FileReference fileRef) throws 
HyracksDataException {
+        // Never delete the storage dir in cloud storage
         if 
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
 {
-            // Never delete the storage dir in cloud storage
-            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+            File localFile = fileRef.getFile();
+            // if file reference exists,and it is a file, then list is not 
required
+            Set<String> paths =
+                    localFile.exists() && localFile.isFile() ? 
Collections.singleton(fileRef.getRelativePath())
+                            : 
list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
+            cloudClient.deleteObjects(bucket, paths);
         }
         localIoManager.delete(fileRef);
     }
 
+    @Override
+    public IIOBulkOperation createDeleteBulkOperation() {
+        return new DeleteBulkCloudOperation(localIoManager, bucket, 
cloudClient);
+    }
+
     @Override
     public final void close(IFileHandle fHandle) throws HyracksDataException {
         try {
@@ -251,15 +265,6 @@ public abstract class AbstractCloudIOManager extends 
IOManager implements IParti
         localIoManager.overwrite(fileRef, bytes);
     }
 
-    @Override
-    public final void deleteDirectory(FileReference fileRef) throws 
HyracksDataException {
-        if 
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
 {
-            // Never delete the storage dir in cloud storage
-            cloudClient.deleteObject(bucket, fileRef.getRelativePath());
-        }
-        localIoManager.deleteDirectory(fileRef);
-    }
-
     @Override
     public final void create(FileReference fileRef) throws 
HyracksDataException {
         // We need to delete the local file on create as the cloud storage 
didn't complete the upload
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
new file mode 100644
index 0000000000..c7cea902ed
--- /dev/null
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
+
+public class DeleteBulkCloudOperation extends DeleteBulkOperation {
+    private final String bucket;
+    private final ICloudClient cloudClient;
+
+    public DeleteBulkCloudOperation(IIOManager ioManager, String bucket, 
ICloudClient cloudClient) {
+        super(ioManager);
+        this.bucket = bucket;
+        this.cloudClient = cloudClient;
+    }
+
+    @Override
+    public void performOperation() throws HyracksDataException {
+        /*
+         * TODO What about deleting multiple directories?
+         *      Actually, is there a case where we delete multiple directories 
from the cloud?
+         */
+        List<String> paths = 
fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
+        cloudClient.deleteObjects(bucket, paths);
+
+        // Bulk delete locally as well
+        super.performOperation();
+    }
+}
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 4f02df208d..ff26915bc3 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -21,6 +21,7 @@ package org.apache.asterix.cloud.clients;
 import java.io.FilenameFilter;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
@@ -36,7 +37,7 @@ public interface ICloudClient {
      * Creates a cloud buffered writer
      *
      * @param bucket bucket to write to
-     * @param path path to write to
+     * @param path   path to write to
      * @return buffered writer
      */
     ICloudBufferedWriter createBufferedWriter(String bucket, String path);
@@ -45,7 +46,7 @@ public interface ICloudClient {
      * Lists objects at the specified bucket and path, and applies the file 
name filter on the returned objects
      *
      * @param bucket bucket to list from
-     * @param path path to list from
+     * @param path   path to list from
      * @param filter filter to apply
      * @return file names returned after applying the file name filter
      */
@@ -56,7 +57,7 @@ public interface ICloudClient {
      * buffer.remaining()
      *
      * @param bucket bucket
-     * @param path path
+     * @param path   path
      * @param offset offset
      * @param buffer buffer
      * @return returns the buffer position
@@ -67,7 +68,7 @@ public interface ICloudClient {
      * Reads all bytes of an object at the specified bucket and path
      *
      * @param bucket bucket
-     * @param path path
+     * @param path   path
      * @return bytes
      * @throws HyracksDataException HyracksDataException
      */
@@ -77,7 +78,7 @@ public interface ICloudClient {
      * Returns the {@code InputStream} of an object at the specified bucket 
and path
      *
      * @param bucket bucket
-     * @param path path
+     * @param path   path
      * @return inputstream
      */
     InputStream getObjectStream(String bucket, String path);
@@ -86,33 +87,33 @@ public interface ICloudClient {
      * Writes the content of the byte array into the bucket at the specified 
path
      *
      * @param bucket bucket
-     * @param path path
-     * @param data data
+     * @param path   path
+     * @param data   data
      */
     void write(String bucket, String path, byte[] data);
 
     /**
      * Copies an object from the source path to the destination path
      *
-     * @param bucket bucket
-     * @param srcPath source path
+     * @param bucket   bucket
+     * @param srcPath  source path
      * @param destPath destination path
      */
     void copy(String bucket, String srcPath, FileReference destPath);
 
     /**
-     * Deletes an object at the specified bucket and path
+     * Deletes all objects at the specified bucket and paths
      *
      * @param bucket bucket
-     * @param path path
+     * @param paths  paths of all objects to be deleted
      */
-    void deleteObject(String bucket, String path);
+    void deleteObjects(String bucket, Collection<String> paths);
 
     /**
      * Returns the size of the object at the specified path
      *
      * @param bucket bucket
-     * @param path path
+     * @param path   path
      * @return size
      */
     long getObjectSize(String bucket, String path) throws HyracksDataException;
@@ -121,7 +122,7 @@ public interface ICloudClient {
      * Checks if an object exists at the specified path
      *
      * @param bucket bucket
-     * @param path path
+     * @param path   path
      * @return {@code true} if the object exists, {@code false} otherwise
      */
     boolean exists(String bucket, String path) throws HyracksDataException;
@@ -129,7 +130,7 @@ public interface ICloudClient {
     /**
      * Syncs files by downloading them from cloud storage to local storage
      *
-     * @param bucket bucket to sync from
+     * @param bucket                   bucket to sync from
      * @param cloudToLocalStoragePaths map of cloud storage partition to local 
storage path
      * @throws HyracksDataException HyracksDataException
      */
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index c51f84a96e..5619fc8ca8 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -31,8 +31,10 @@ import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,6 +80,8 @@ public class S3CloudClient implements ICloudClient {
     private static final Logger LOGGER = LogManager.getLogger();
     // TODO(htowaileb): Temporary variables, can we get this from the used 
instance?
     private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
+    // The maximum number of file that can be deleted (AWS restriction)
+    private static final int DELETE_BATCH_SIZE = 1000;
 
     private final S3ClientConfig config;
     private final S3Client s3Client;
@@ -203,20 +207,25 @@ public class S3CloudClient implements ICloudClient {
     }
 
     @Override
-    public void deleteObject(String bucket, String path) {
-        Set<String> fileList = listObjects(bucket, path, IoUtil.NO_OP_FILTER);
-        if (fileList.isEmpty()) {
+    public void deleteObjects(String bucket, Collection<String> paths) {
+        if (paths.isEmpty()) {
             return;
         }
 
-        profiler.objectDelete();
         List<ObjectIdentifier> objectIdentifiers = new ArrayList<>();
-        for (String file : fileList) {
-            
objectIdentifiers.add(ObjectIdentifier.builder().key(file).build());
+        Iterator<String> pathIter = paths.iterator();
+        ObjectIdentifier.Builder builder = ObjectIdentifier.builder();
+        while (pathIter.hasNext()) {
+            objectIdentifiers.clear();
+            for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
+                objectIdentifiers.add(builder.key(pathIter.next()).build());
+            }
+
+            Delete delete = 
Delete.builder().objects(objectIdentifiers).build();
+            DeleteObjectsRequest deleteReq = 
DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
+            s3Client.deleteObjects(deleteReq);
+            profiler.objectDelete();
         }
-        Delete delete = Delete.builder().objects(objectIdentifiers).build();
-        DeleteObjectsRequest deleteReq = 
DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
-        s3Client.deleteObjects(deleteReq);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java 
b/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
index bd0ab285c4..612aa1d746 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
@@ -21,6 +21,7 @@ package org.apach.asterix.cloud;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 import org.apache.asterix.cloud.CloudResettableInputStream;
 import org.apache.asterix.cloud.WriteBufferProvider;
@@ -46,7 +47,7 @@ public abstract class LSMTest {
     @Test
     public void a4deleteTest() {
         try {
-            CLOUD_CLIENT.deleteObject(PLAYGROUND_CONTAINER, 
BUCKET_STORAGE_ROOT);
+            CLOUD_CLIENT.deleteObjects(PLAYGROUND_CONTAINER, 
Collections.singleton(BUCKET_STORAGE_ROOT));
         } catch (Exception ex) {
             ex.printStackTrace();
         }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index 2b0e2d8cf5..7d682e70ec 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -54,7 +54,7 @@ public class DropIndexTask implements IReplicaTask {
             final FileReference indexFile = ioManager.resolve(file);
             if (ioManager.exists(indexFile)) {
                 FileReference indexDir = indexFile.getParent();
-                ioManager.deleteDirectory(indexDir);
+                ioManager.delete(indexDir);
                 ((PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository())
                         
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
                 LOGGER.info(() -> "Deleted index: " + 
indexFile.getAbsolutePath());
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 4e71b1c629..1eef182c06 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -57,6 +57,7 @@ import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
@@ -274,6 +275,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     public synchronized void deleteInvalidIndexes(Predicate<LocalResource> 
filter) throws HyracksDataException {
+        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
         for (FileReference root : storageRoots) {
             final Collection<FileReference> files = ioManager.list(root, 
METADATA_FILES_FILTER);
             try {
@@ -282,13 +284,14 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
                     if (localResource != null && filter.test(localResource)) {
                         FileReference parent = file.getParent();
                         LOGGER.warn("deleting invalid metadata index {}", 
parent);
-                        ioManager.delete(parent);
+                        bulkDelete.add(parent);
                     }
                 }
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
             }
         }
+        ioManager.performBulkOperation(bulkDelete);
         resourceCache.invalidateAll();
     }
 
@@ -361,9 +364,11 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
      * Deletes physical files of all data verses.
      */
     public synchronized void deleteStorageData() throws HyracksDataException {
+        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
         for (FileReference root : storageRoots) {
-            ioManager.deleteDirectory(root);
+            bulkDelete.add(root);
         }
+        ioManager.performBulkOperation(bulkDelete);
         createStorageRoots();
     }
 
@@ -492,23 +497,27 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     }
 
     public synchronized void deleteCorruptedResources() throws 
HyracksDataException {
+        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
         for (FileReference root : storageRoots) {
             final Collection<FileReference> metadataMaskFiles = 
ioManager.list(root, METADATA_MASK_FILES_FILTER);
             for (FileReference metadataMaskFile : metadataMaskFiles) {
                 final FileReference resourceFile = 
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
-                ioManager.delete(resourceFile);
-                ioManager.delete(metadataMaskFile);
+                bulkDelete.add(resourceFile);
+                bulkDelete.add(metadataMaskFile);
             }
         }
+        ioManager.performBulkOperation(bulkDelete);
     }
 
     private void deleteIndexMaskedFiles(FileReference index) throws 
IOException {
+        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
         Collection<FileReference> masks = ioManager.list(index, 
MASK_FILES_FILTER);
         for (FileReference mask : masks) {
             deleteIndexMaskedFiles(index, mask);
             // delete the mask itself
-            ioManager.delete(mask);
+            bulkDelete.add(mask);
         }
+        ioManager.performBulkOperation(bulkDelete);
     }
 
     private boolean isValidIndex(FileReference index) throws IOException {
@@ -524,6 +533,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         if (indexComponentFiles == null) {
             throw new IOException(index + " doesn't exist or an IO error 
occurred");
         }
+        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
         final long validComponentSequence = 
getIndexCheckpointManager(index).getValidComponentSequence();
         for (FileReference componentFileRef : indexComponentFiles) {
             // delete any file with start or end sequence > valid component 
sequence
@@ -532,9 +542,10 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
             if (fileStart > validComponentSequence || fileEnd > 
validComponentSequence) {
                 LOGGER.warn(() -> "Deleting invalid component file " + 
componentFileRef.getAbsolutePath()
                         + " based on valid sequence " + 
validComponentSequence);
-                ioManager.delete(componentFileRef);
+                bulkDelete.add(componentFileRef);
             }
         }
+        ioManager.performBulkOperation(bulkDelete);
     }
 
     private IIndexCheckpointManager getIndexCheckpointManager(FileReference 
index) throws HyracksDataException {
@@ -556,10 +567,12 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
             maskedFiles = ioManager.list(index, (dir, name) -> 
name.equals(maskedFileName));
         }
         if (maskedFiles != null) {
+            IIOBulkOperation bulkDelete = 
ioManager.createDeleteBulkOperation();
             for (FileReference maskedFile : maskedFiles) {
                 LOGGER.info(() -> "deleting masked file: " + 
maskedFile.getAbsolutePath());
-                ioManager.delete(maskedFile);
+                bulkDelete.add(maskedFile);
             }
+            ioManager.performBulkOperation(bulkDelete);
         }
     }
 
@@ -631,18 +644,6 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
     }
 
-    public synchronized void keepPartitions(Set<Integer> keepPartitions) 
throws HyracksDataException {
-        List<FileReference> onDiskPartitions = getOnDiskPartitions();
-        for (FileReference onDiskPartition : onDiskPartitions) {
-            int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
-            if (!keepPartitions.contains(partitionNum)) {
-                LOGGER.warn("deleting partition {} since it is not on 
partitions to keep {}", partitionNum,
-                        keepPartitions);
-                ioManager.delete(onDiskPartition);
-            }
-        }
-    }
-
     public synchronized List<FileReference> getOnDiskPartitions() {
         List<FileReference> onDiskPartitions = new ArrayList<>();
         for (FileReference root : storageRoots) {
@@ -660,13 +661,15 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
     public void deletePartition(int partitionId) throws HyracksDataException {
         Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+        IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
         for (FileReference onDiskPartition : onDiskPartitions) {
             int partitionNum = 
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
             if (partitionNum == partitionId) {
                 LOGGER.warn("deleting partition {}", partitionNum);
-                ioManager.delete(onDiskPartition);
-                return;
+                bulkDelete.add(onDiskPartition);
+                break;
             }
         }
+        ioManager.performBulkOperation(bulkDelete);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOBulkOperation.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOBulkOperation.java
new file mode 100644
index 0000000000..c5e84fd7a9
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOBulkOperation.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.io;
+
+/**
+ * A descriptor for a bulk operation
+ */
+public interface IIOBulkOperation {
+    /**
+     * Add file reference to perform the required operation (e.g., delete)
+     *
+     * @param fileReference that should be included in this bulk operation
+     * @see IIOManager#performBulkOperation(IIOBulkOperation)
+     * @see IIOManager#createDeleteBulkOperation()
+     */
+    void add(FileReference fileReference);
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 95dcb2f7a3..a6520c63ce 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.api.io;
 import java.io.Closeable;
 import java.io.FilenameFilter;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.WritableByteChannel;
 import java.util.List;
 import java.util.Set;
@@ -41,40 +40,136 @@ public interface IIOManager extends Closeable {
         METADATA_ASYNC_DATA_ASYNC
     }
 
+    /**
+     * @return IO devices in an NC
+     */
     List<IODeviceHandle> getIODevices();
 
+    /**
+     * Open file
+     *
+     * @param fileRef  file reference
+     * @param rwMode   r/w mode
+     * @param syncMode sync mode
+     * @return file handle for the opened file
+     */
     IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, 
FileSyncMode syncMode)
             throws HyracksDataException;
 
+    /**
+     * Do sync write (utilizes {@link IAsyncRequest})
+     *
+     * @param fHandle handle of the opened file
+     * @param offset  start offset
+     * @param data    buffer to write
+     * @return number of written bytes
+     */
     int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws 
HyracksDataException;
 
+    /**
+     * Do sync write (utilizes {@link IAsyncRequest})
+     *
+     * @param fHandle   file handle of the opened file
+     * @param offset    start offset
+     * @param dataArray buffers to write
+     * @return number of written bytes
+     */
     long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) 
throws HyracksDataException;
 
+    /**
+     * Do sync read (utilizes {@link IAsyncRequest})
+     *
+     * @param fHandle handle of the opened file
+     * @param offset  start offset
+     * @param data    destination buffer for the requested content
+     * @return number of read bytes
+     */
     int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws 
HyracksDataException;
 
+    /**
+     * Do async write
+     *
+     * @param fHandle handle of the opened file
+     * @param offset  start offset
+     * @param data    buffer to write
+     * @return IAsyncRequest which allows to wait for the write request
+     */
     IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer 
data) throws HyracksDataException;
 
+    /**
+     * Do async write
+     *
+     * @param fHandle   handle of the opened file
+     * @param offset    start offset
+     * @param dataArray buffers to write
+     * @return IAsyncRequest which allows to wait for the write request
+     */
     IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] 
dataArray) throws HyracksDataException;
 
+    /**
+     * Do async read
+     *
+     * @param fHandle handle of the opened file
+     * @param offset  start offset
+     * @param data    destination buffer for the requested content
+     * @return IAsyncRequest which allows to wait for the read request
+     */
     IAsyncRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data) 
throws HyracksDataException;
 
+    /**
+     * Close file
+     *
+     * @param fHandle handle of the opened file
+     */
     void close(IFileHandle fHandle) throws HyracksDataException;
 
+    /**
+     * Forces the content of the opened file
+     *
+     * @param fileHandle handle of the opened file
+     * @param metadata   force the file metadata as well
+     */
     void sync(IFileHandle fileHandle, boolean metadata) throws 
HyracksDataException;
 
+    /**
+     * Truncates the opened file to the requested size
+     *
+     * @param fileHandle handle of the opened file
+     * @param size       required size
+     */
     void truncate(IFileHandle fileHandle, long size) throws 
HyracksDataException;
 
+    /**
+     * Gets the size of an opened file
+     *
+     * @param fileHandle handle of the opened file
+     * @return file size
+     */
     long getSize(IFileHandle fileHandle) throws HyracksDataException;
 
+    /**
+     * Gets the size of a file
+     *
+     * @param fileReference file reference
+     * @return file size
+     */
     long getSize(FileReference fileReference) throws HyracksDataException;
 
+    /**
+     * Returns a new write channel
+     *
+     * @param fileHandle handle of the opened file
+     * @return a new write channel
+     */
     WritableByteChannel newWritableChannel(IFileHandle fileHandle);
 
     void deleteWorkspaceFiles() throws HyracksDataException;
 
     /**
-     * @param ioDeviceId
-     * @param path
+     * Returns a file reference of a file
+     *
+     * @param ioDeviceId device Id
+     * @param path       relative path
      * @return A file reference based on the mounting point of {@code 
ioDeviceId} and the passed {@code relativePath}
      */
     FileReference getFileReference(int ioDeviceId, String path);
@@ -82,18 +177,16 @@ public interface IIOManager extends Closeable {
     /**
      * determine which IO device holds the path and returns a FileReference 
based on that
      *
-     * @param path
+     * @param path relative path
      * @return A file reference based on the mounting point of {@code 
ioDeviceId} and the passed {@code Path}
-     * @throws HyracksDataException
      */
     FileReference resolve(String path) throws HyracksDataException;
 
     /**
      * Gets a file reference from an absolute path
      *
-     * @param path
+     * @param path absolute path
      * @return A file reference based on the mounting point of {@code 
ioDeviceId} and the passed {@code relativePath}
-     * @throws HyracksDataException
      * @deprecated use getFileRef(int ioDeviceId, String path) instead
      */
     @Deprecated
@@ -102,9 +195,8 @@ public interface IIOManager extends Closeable {
     /**
      * Create a workspace file with the given prefix
      *
-     * @param prefix
+     * @param prefix of workspace file
      * @return A FileReference for the created workspace file
-     * @throws HyracksDataException
      */
     FileReference createWorkspaceFile(String prefix) throws 
HyracksDataException;
 
@@ -118,36 +210,90 @@ public interface IIOManager extends Closeable {
     /**
      * Delete any additional artifacts associated with the file reference
      *
-     * @param fileRef
+     * @param fileRef file/directory to delete
      */
     void delete(FileReference fileRef) throws HyracksDataException;
 
+    /**
+     * @return bulk-delete operation (for either file or directories)
+     */
+    IIOBulkOperation createDeleteBulkOperation();
+
+    /**
+     * List of files in the directory and its subdirectories
+     *
+     * @param dir directory to list
+     * @return a set of all files in the directory and its subdirectories
+     */
     Set<FileReference> list(FileReference dir) throws HyracksDataException;
 
     /**
-     * Lists the files matching {@code filter} recursively starting from 
{@code dir}
+     * List of files in the directory and its subdirectories that satisfy the 
provided filter
      *
-     * @param dir
-     * @param filter
-     * @return the matching files
-     * @throws HyracksDataException
+     * @param dir    directory to list
+     * @param filter to test if a file reference satisfies a certain predicate
+     * @return a set of all files in the directory and its subdirectories that 
satisfies the provided filter
      */
     Set<FileReference> list(FileReference dir, FilenameFilter filter) throws 
HyracksDataException;
 
-    void overwrite(FileReference fileRef, byte[] bytes) throws 
ClosedByInterruptException, HyracksDataException;
+    /**
+     * Overwrites (or write) a file
+     *
+     * @param fileRef file reference of the file to overwrite
+     * @param bytes   content
+     */
+    void overwrite(FileReference fileRef, byte[] bytes) throws 
HyracksDataException;
 
+    /**
+     * Reads the entire content of a file
+     *
+     * @param fileRef file reference
+     * @return a byte array of the content
+     */
     byte[] readAllBytes(FileReference fileRef) throws HyracksDataException;
 
-    void copyDirectory(FileReference srcMetadataScopePath, FileReference 
targetMetadataScopePath)
-            throws HyracksDataException;
-
-    void deleteDirectory(FileReference root) throws HyracksDataException;
+    /**
+     * Copy the content of one directory to another
+     *
+     * @param srcDir  source directory
+     * @param destDir destination directory
+     */
+    void copyDirectory(FileReference srcDir, FileReference destDir) throws 
HyracksDataException;
 
+    /**
+     * Checks whether a file exists
+     *
+     * @param fileRef file reference
+     * @return true if the file exists, false otherwise
+     */
     boolean exists(FileReference fileRef) throws HyracksDataException;
 
+    /**
+     * Creates a file
+     *
+     * @param fileRef file reference
+     */
     void create(FileReference fileRef) throws HyracksDataException;
 
-    boolean makeDirectories(FileReference resourceDir);
+    /**
+     * Make a directory and all of its parent directories
+     *
+     * @param dir directory to create
+     * @return true of it was created
+     */
+    boolean makeDirectories(FileReference dir);
+
+    /**
+     * Deletes the content of a directory (files and subdirectories)
+     *
+     * @param dir directory reference
+     */
+    void cleanDirectory(FileReference dir) throws HyracksDataException;
 
-    void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
+    /**
+     * Performs a bulk operation
+     *
+     * @param bulkOperation the operation to perform
+     */
+    void performBulkOperation(IIOBulkOperation bulkOperation) throws 
HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 0e6578fd66..25a67ff0e7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -47,11 +47,14 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileReference;
 import org.apache.hyracks.api.io.IFileDeviceResolver;
 import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOBulkOperation;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.io.IODeviceHandle;
 import org.apache.hyracks.api.util.InvokeUtil;
 import org.apache.hyracks.api.util.IoUtil;
 import org.apache.hyracks.control.nc.io.IoRequest.State;
+import org.apache.hyracks.control.nc.io.bulk.AbstractBulkOperation;
+import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
 import org.apache.hyracks.util.file.FileUtil;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -506,6 +509,11 @@ public class IOManager implements IIOManager {
         }
     }
 
+    @Override
+    public IIOBulkOperation createDeleteBulkOperation() {
+        return new DeleteBulkOperation(this);
+    }
+
     @Override
     public Set<FileReference> list(FileReference dir) throws 
HyracksDataException {
         return list(dir, IoUtil.NO_OP_FILTER);
@@ -551,15 +559,6 @@ public class IOManager implements IIOManager {
         }
     }
 
-    @Override
-    public void deleteDirectory(FileReference root) throws 
HyracksDataException {
-        try {
-            FileUtils.deleteDirectory(root.getFile());
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
     @Override
     public boolean exists(FileReference fileRef) throws HyracksDataException {
         return fileRef.getFile().exists();
@@ -592,4 +591,9 @@ public class IOManager implements IIOManager {
             throw HyracksDataException.create(e);
         }
     }
+
+    @Override
+    public void performBulkOperation(IIOBulkOperation bulkOperation) throws 
HyracksDataException {
+        ((AbstractBulkOperation) bulkOperation).performOperation();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
new file mode 100644
index 0000000000..4ca7b9c77d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.bulk;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
+import org.apache.hyracks.api.io.IIOManager;
+
+public abstract class AbstractBulkOperation implements IIOBulkOperation {
+    protected final IIOManager ioManager;
+    protected final List<FileReference> fileReferences;
+
+    AbstractBulkOperation(IIOManager ioManager) {
+        this.ioManager = ioManager;
+        fileReferences = new ArrayList<>();
+    }
+
+    @Override
+    public final void add(FileReference fileReference) {
+        fileReferences.add(fileReference);
+    }
+
+    public abstract void performOperation() throws HyracksDataException;
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
new file mode 100644
index 0000000000..5ccfdd6073
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.bulk;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class DeleteBulkOperation extends AbstractBulkOperation {
+    public DeleteBulkOperation(IIOManager ioManager) {
+        super(ioManager);
+    }
+
+    @Override
+    public void performOperation() throws HyracksDataException {
+        for (FileReference fileReference : fileReferences) {
+            ioManager.delete(fileReference);
+        }
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 6194854bbc..cff54a0980 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -179,7 +179,7 @@ public abstract class AbstractLSMIndexFileManager 
implements ILSMIndexFileManage
 
     @Override
     public void deleteDirs() throws HyracksDataException {
-        ioManager.deleteDirectory(baseDir);
+        ioManager.delete(baseDir);
     }
 
     @Override


Reply via email to