This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new c3e9fdae7a [ASTERIXDB-3218][*DB] Support bulk deletes of files
c3e9fdae7a is described below
commit c3e9fdae7a696f90e71c57abfbe5383c1b1c4398
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Wed Jul 12 16:21:09 2023 -0700
[ASTERIXDB-3218][*DB] Support bulk deletes of files
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Extend IIOManager to bulk-delete mutliple files.
- Extract list from S3CloudClient#deleteObjects()
- Only do list when deleting uncahced
files or when deleting directories.
- Unify IIOManager#delete() to delete files
and directories
- Make S3CloudClient#deleteObjects() to delete
1000 files per request -- an S3 limit
- Other misc. optimizations
Change-Id: Ic53a45ef4cc0911ee2b07c73c267d492600bc69f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17630
Reviewed-by: Ali Alsuliman <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
---
.../asterix/app/nc/IndexCheckpointManager.java | 10 +-
.../asterix/cloud/AbstractCloudIOManager.java | 29 ++--
.../cloud/bulk/DeleteBulkCloudOperation.java | 52 ++++++
.../apache/asterix/cloud/clients/ICloudClient.java | 31 ++--
.../cloud/clients/aws/s3/S3CloudClient.java | 27 ++-
.../test/java/org/apach/asterix/cloud/LSMTest.java | 3 +-
.../replication/messaging/DropIndexTask.java | 2 +-
.../PersistentLocalResourceRepository.java | 45 ++---
.../apache/hyracks/api/io/IIOBulkOperation.java | 33 ++++
.../java/org/apache/hyracks/api/io/IIOManager.java | 190 ++++++++++++++++++---
.../apache/hyracks/control/nc/io/IOManager.java | 22 ++-
.../control/nc/io/bulk/AbstractBulkOperation.java | 44 +++++
.../control/nc/io/bulk/DeleteBulkOperation.java | 36 ++++
.../common/impls/AbstractLSMIndexFileManager.java | 2 +-
14 files changed, 430 insertions(+), 96 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 9383f063ff..2e43397c03 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -31,6 +31,7 @@ import org.apache.asterix.common.storage.IndexCheckpoint;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.util.IoUtil;
import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
@@ -201,10 +202,7 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
final FileReference checkpointPath = getCheckpointPath(checkpoint);
for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
try {
- // clean up from previous write failure
- if (ioManager.exists(checkpointPath)) {
- ioManager.delete(checkpointPath);
- }
+ // Overwrite will clean up from previous write failure (if any)
ioManager.overwrite(checkpointPath,
checkpoint.asJson().getBytes());
// ensure it was written correctly by reading it
read(checkpointPath);
@@ -247,11 +245,13 @@ public class IndexCheckpointManager implements
IIndexCheckpointManager {
try {
final Collection<FileReference> checkpointFiles =
ioManager.list(indexPath, CHECKPOINT_FILE_FILTER);
if (!checkpointFiles.isEmpty()) {
+ IIOBulkOperation deleteBulk =
ioManager.createDeleteBulkOperation();
for (FileReference checkpointFile : checkpointFiles) {
if (getCheckpointIdFromFileName(checkpointFile) <
(latestId - historyToKeep)) {
- ioManager.delete(checkpointFile);
+ deleteBulk.add(checkpointFile);
}
}
+ ioManager.performBulkOperation(deleteBulk);
}
} catch (Exception e) {
LOGGER.warn(() -> "Couldn't delete history checkpoints at " +
indexPath, e);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 1c069199b3..49f71f49d2 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -26,10 +26,13 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation;
import org.apache.asterix.cloud.clients.CloudClientProvider;
import org.apache.asterix.cloud.clients.ICloudClient;
import org.apache.asterix.cloud.util.CloudFileUtil;
@@ -40,6 +43,7 @@ import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.util.file.FileUtil;
@@ -117,7 +121,7 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
if (!partitions.contains(partitionNum)) {
LOGGER.warn("Deleting storage partition {} as it does not
belong to the current storage partitions {}",
partitionNum, partitions);
- localIoManager.deleteDirectory(partitionDir);
+ localIoManager.delete(partitionDir);
}
}
}
@@ -201,13 +205,23 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
@Override
public final void delete(FileReference fileRef) throws
HyracksDataException {
+ // Never delete the storage dir in cloud storage
if
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
{
- // Never delete the storage dir in cloud storage
- cloudClient.deleteObject(bucket, fileRef.getRelativePath());
+ File localFile = fileRef.getFile();
+ // if file reference exists,and it is a file, then list is not
required
+ Set<String> paths =
+ localFile.exists() && localFile.isFile() ?
Collections.singleton(fileRef.getRelativePath())
+ :
list(fileRef).stream().map(FileReference::getRelativePath).collect(Collectors.toSet());
+ cloudClient.deleteObjects(bucket, paths);
}
localIoManager.delete(fileRef);
}
+ @Override
+ public IIOBulkOperation createDeleteBulkOperation() {
+ return new DeleteBulkCloudOperation(localIoManager, bucket,
cloudClient);
+ }
+
@Override
public final void close(IFileHandle fHandle) throws HyracksDataException {
try {
@@ -251,15 +265,6 @@ public abstract class AbstractCloudIOManager extends
IOManager implements IParti
localIoManager.overwrite(fileRef, bytes);
}
- @Override
- public final void deleteDirectory(FileReference fileRef) throws
HyracksDataException {
- if
(!STORAGE_ROOT_DIR_NAME.equals(IoUtil.getFileNameFromPath(fileRef.getAbsolutePath())))
{
- // Never delete the storage dir in cloud storage
- cloudClient.deleteObject(bucket, fileRef.getRelativePath());
- }
- localIoManager.deleteDirectory(fileRef);
- }
-
@Override
public final void create(FileReference fileRef) throws
HyracksDataException {
// We need to delete the local file on create as the cloud storage
didn't complete the upload
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
new file mode 100644
index 0000000000..c7cea902ed
--- /dev/null
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/bulk/DeleteBulkCloudOperation.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.cloud.bulk;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.cloud.clients.ICloudClient;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
+
+public class DeleteBulkCloudOperation extends DeleteBulkOperation {
+ private final String bucket;
+ private final ICloudClient cloudClient;
+
+ public DeleteBulkCloudOperation(IIOManager ioManager, String bucket,
ICloudClient cloudClient) {
+ super(ioManager);
+ this.bucket = bucket;
+ this.cloudClient = cloudClient;
+ }
+
+ @Override
+ public void performOperation() throws HyracksDataException {
+ /*
+ * TODO What about deleting multiple directories?
+ * Actually, is there a case where we delete multiple directories
from the cloud?
+ */
+ List<String> paths =
fileReferences.stream().map(FileReference::getRelativePath).collect(Collectors.toList());
+ cloudClient.deleteObjects(bucket, paths);
+
+ // Bulk delete locally as well
+ super.performOperation();
+ }
+}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
index 4f02df208d..ff26915bc3 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java
@@ -21,6 +21,7 @@ package org.apache.asterix.cloud.clients;
import java.io.FilenameFilter;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.Map;
import java.util.Set;
@@ -36,7 +37,7 @@ public interface ICloudClient {
* Creates a cloud buffered writer
*
* @param bucket bucket to write to
- * @param path path to write to
+ * @param path path to write to
* @return buffered writer
*/
ICloudBufferedWriter createBufferedWriter(String bucket, String path);
@@ -45,7 +46,7 @@ public interface ICloudClient {
* Lists objects at the specified bucket and path, and applies the file
name filter on the returned objects
*
* @param bucket bucket to list from
- * @param path path to list from
+ * @param path path to list from
* @param filter filter to apply
* @return file names returned after applying the file name filter
*/
@@ -56,7 +57,7 @@ public interface ICloudClient {
* buffer.remaining()
*
* @param bucket bucket
- * @param path path
+ * @param path path
* @param offset offset
* @param buffer buffer
* @return returns the buffer position
@@ -67,7 +68,7 @@ public interface ICloudClient {
* Reads all bytes of an object at the specified bucket and path
*
* @param bucket bucket
- * @param path path
+ * @param path path
* @return bytes
* @throws HyracksDataException HyracksDataException
*/
@@ -77,7 +78,7 @@ public interface ICloudClient {
* Returns the {@code InputStream} of an object at the specified bucket
and path
*
* @param bucket bucket
- * @param path path
+ * @param path path
* @return inputstream
*/
InputStream getObjectStream(String bucket, String path);
@@ -86,33 +87,33 @@ public interface ICloudClient {
* Writes the content of the byte array into the bucket at the specified
path
*
* @param bucket bucket
- * @param path path
- * @param data data
+ * @param path path
+ * @param data data
*/
void write(String bucket, String path, byte[] data);
/**
* Copies an object from the source path to the destination path
*
- * @param bucket bucket
- * @param srcPath source path
+ * @param bucket bucket
+ * @param srcPath source path
* @param destPath destination path
*/
void copy(String bucket, String srcPath, FileReference destPath);
/**
- * Deletes an object at the specified bucket and path
+ * Deletes all objects at the specified bucket and paths
*
* @param bucket bucket
- * @param path path
+ * @param paths paths of all objects to be deleted
*/
- void deleteObject(String bucket, String path);
+ void deleteObjects(String bucket, Collection<String> paths);
/**
* Returns the size of the object at the specified path
*
* @param bucket bucket
- * @param path path
+ * @param path path
* @return size
*/
long getObjectSize(String bucket, String path) throws HyracksDataException;
@@ -121,7 +122,7 @@ public interface ICloudClient {
* Checks if an object exists at the specified path
*
* @param bucket bucket
- * @param path path
+ * @param path path
* @return {@code true} if the object exists, {@code false} otherwise
*/
boolean exists(String bucket, String path) throws HyracksDataException;
@@ -129,7 +130,7 @@ public interface ICloudClient {
/**
* Syncs files by downloading them from cloud storage to local storage
*
- * @param bucket bucket to sync from
+ * @param bucket bucket to sync from
* @param cloudToLocalStoragePaths map of cloud storage partition to local
storage path
* @throws HyracksDataException HyracksDataException
*/
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
index c51f84a96e..5619fc8ca8 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java
@@ -31,8 +31,10 @@ import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -78,6 +80,8 @@ public class S3CloudClient implements ICloudClient {
private static final Logger LOGGER = LogManager.getLogger();
// TODO(htowaileb): Temporary variables, can we get this from the used
instance?
private static final double MAX_HOST_BANDWIDTH = 10.0; // in Gbps
+ // The maximum number of file that can be deleted (AWS restriction)
+ private static final int DELETE_BATCH_SIZE = 1000;
private final S3ClientConfig config;
private final S3Client s3Client;
@@ -203,20 +207,25 @@ public class S3CloudClient implements ICloudClient {
}
@Override
- public void deleteObject(String bucket, String path) {
- Set<String> fileList = listObjects(bucket, path, IoUtil.NO_OP_FILTER);
- if (fileList.isEmpty()) {
+ public void deleteObjects(String bucket, Collection<String> paths) {
+ if (paths.isEmpty()) {
return;
}
- profiler.objectDelete();
List<ObjectIdentifier> objectIdentifiers = new ArrayList<>();
- for (String file : fileList) {
-
objectIdentifiers.add(ObjectIdentifier.builder().key(file).build());
+ Iterator<String> pathIter = paths.iterator();
+ ObjectIdentifier.Builder builder = ObjectIdentifier.builder();
+ while (pathIter.hasNext()) {
+ objectIdentifiers.clear();
+ for (int i = 0; pathIter.hasNext() && i < DELETE_BATCH_SIZE; i++) {
+ objectIdentifiers.add(builder.key(pathIter.next()).build());
+ }
+
+ Delete delete =
Delete.builder().objects(objectIdentifiers).build();
+ DeleteObjectsRequest deleteReq =
DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
+ s3Client.deleteObjects(deleteReq);
+ profiler.objectDelete();
}
- Delete delete = Delete.builder().objects(objectIdentifiers).build();
- DeleteObjectsRequest deleteReq =
DeleteObjectsRequest.builder().bucket(bucket).delete(delete).build();
- s3Client.deleteObjects(deleteReq);
}
@Override
diff --git
a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
b/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
index bd0ab285c4..612aa1d746 100644
--- a/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
+++ b/asterixdb/asterix-cloud/src/test/java/org/apach/asterix/cloud/LSMTest.java
@@ -21,6 +21,7 @@ package org.apach.asterix.cloud;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Collections;
import org.apache.asterix.cloud.CloudResettableInputStream;
import org.apache.asterix.cloud.WriteBufferProvider;
@@ -46,7 +47,7 @@ public abstract class LSMTest {
@Test
public void a4deleteTest() {
try {
- CLOUD_CLIENT.deleteObject(PLAYGROUND_CONTAINER,
BUCKET_STORAGE_ROOT);
+ CLOUD_CLIENT.deleteObjects(PLAYGROUND_CONTAINER,
Collections.singleton(BUCKET_STORAGE_ROOT));
} catch (Exception ex) {
ex.printStackTrace();
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
index 2b0e2d8cf5..7d682e70ec 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/DropIndexTask.java
@@ -54,7 +54,7 @@ public class DropIndexTask implements IReplicaTask {
final FileReference indexFile = ioManager.resolve(file);
if (ioManager.exists(indexFile)) {
FileReference indexDir = indexFile.getParent();
- ioManager.deleteDirectory(indexDir);
+ ioManager.delete(indexDir);
((PersistentLocalResourceRepository)
appCtx.getLocalResourceRepository())
.invalidateResource(ResourceReference.of(file).getRelativePath().toString());
LOGGER.info(() -> "Deleted index: " +
indexFile.getAbsolutePath());
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 4e71b1c629..1eef182c06 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -57,6 +57,7 @@ import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.io.IPersistedResourceRegistry;
@@ -274,6 +275,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
public synchronized void deleteInvalidIndexes(Predicate<LocalResource>
filter) throws HyracksDataException {
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
for (FileReference root : storageRoots) {
final Collection<FileReference> files = ioManager.list(root,
METADATA_FILES_FILTER);
try {
@@ -282,13 +284,14 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
if (localResource != null && filter.test(localResource)) {
FileReference parent = file.getParent();
LOGGER.warn("deleting invalid metadata index {}",
parent);
- ioManager.delete(parent);
+ bulkDelete.add(parent);
}
}
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
+ ioManager.performBulkOperation(bulkDelete);
resourceCache.invalidateAll();
}
@@ -361,9 +364,11 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
* Deletes physical files of all data verses.
*/
public synchronized void deleteStorageData() throws HyracksDataException {
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
for (FileReference root : storageRoots) {
- ioManager.deleteDirectory(root);
+ bulkDelete.add(root);
}
+ ioManager.performBulkOperation(bulkDelete);
createStorageRoots();
}
@@ -492,23 +497,27 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
}
public synchronized void deleteCorruptedResources() throws
HyracksDataException {
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
for (FileReference root : storageRoots) {
final Collection<FileReference> metadataMaskFiles =
ioManager.list(root, METADATA_MASK_FILES_FILTER);
for (FileReference metadataMaskFile : metadataMaskFiles) {
final FileReference resourceFile =
metadataMaskFile.getParent().getChild(METADATA_FILE_NAME);
- ioManager.delete(resourceFile);
- ioManager.delete(metadataMaskFile);
+ bulkDelete.add(resourceFile);
+ bulkDelete.add(metadataMaskFile);
}
}
+ ioManager.performBulkOperation(bulkDelete);
}
private void deleteIndexMaskedFiles(FileReference index) throws
IOException {
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
Collection<FileReference> masks = ioManager.list(index,
MASK_FILES_FILTER);
for (FileReference mask : masks) {
deleteIndexMaskedFiles(index, mask);
// delete the mask itself
- ioManager.delete(mask);
+ bulkDelete.add(mask);
}
+ ioManager.performBulkOperation(bulkDelete);
}
private boolean isValidIndex(FileReference index) throws IOException {
@@ -524,6 +533,7 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
if (indexComponentFiles == null) {
throw new IOException(index + " doesn't exist or an IO error
occurred");
}
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
final long validComponentSequence =
getIndexCheckpointManager(index).getValidComponentSequence();
for (FileReference componentFileRef : indexComponentFiles) {
// delete any file with start or end sequence > valid component
sequence
@@ -532,9 +542,10 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
if (fileStart > validComponentSequence || fileEnd >
validComponentSequence) {
LOGGER.warn(() -> "Deleting invalid component file " +
componentFileRef.getAbsolutePath()
+ " based on valid sequence " +
validComponentSequence);
- ioManager.delete(componentFileRef);
+ bulkDelete.add(componentFileRef);
}
}
+ ioManager.performBulkOperation(bulkDelete);
}
private IIndexCheckpointManager getIndexCheckpointManager(FileReference
index) throws HyracksDataException {
@@ -556,10 +567,12 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
maskedFiles = ioManager.list(index, (dir, name) ->
name.equals(maskedFileName));
}
if (maskedFiles != null) {
+ IIOBulkOperation bulkDelete =
ioManager.createDeleteBulkOperation();
for (FileReference maskedFile : maskedFiles) {
LOGGER.info(() -> "deleting masked file: " +
maskedFile.getAbsolutePath());
- ioManager.delete(maskedFile);
+ bulkDelete.add(maskedFile);
}
+ ioManager.performBulkOperation(bulkDelete);
}
}
@@ -631,18 +644,6 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
return COMPONENT_FILES_FILTER.accept(indexDir.getFile(), fileName);
}
- public synchronized void keepPartitions(Set<Integer> keepPartitions)
throws HyracksDataException {
- List<FileReference> onDiskPartitions = getOnDiskPartitions();
- for (FileReference onDiskPartition : onDiskPartitions) {
- int partitionNum =
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
- if (!keepPartitions.contains(partitionNum)) {
- LOGGER.warn("deleting partition {} since it is not on
partitions to keep {}", partitionNum,
- keepPartitions);
- ioManager.delete(onDiskPartition);
- }
- }
- }
-
public synchronized List<FileReference> getOnDiskPartitions() {
List<FileReference> onDiskPartitions = new ArrayList<>();
for (FileReference root : storageRoots) {
@@ -660,13 +661,15 @@ public class PersistentLocalResourceRepository implements
ILocalResourceReposito
public void deletePartition(int partitionId) throws HyracksDataException {
Collection<FileReference> onDiskPartitions = getOnDiskPartitions();
+ IIOBulkOperation bulkDelete = ioManager.createDeleteBulkOperation();
for (FileReference onDiskPartition : onDiskPartitions) {
int partitionNum =
StoragePathUtil.getPartitionNumFromRelativePath(onDiskPartition.getAbsolutePath());
if (partitionNum == partitionId) {
LOGGER.warn("deleting partition {}", partitionNum);
- ioManager.delete(onDiskPartition);
- return;
+ bulkDelete.add(onDiskPartition);
+ break;
}
}
+ ioManager.performBulkOperation(bulkDelete);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOBulkOperation.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOBulkOperation.java
new file mode 100644
index 0000000000..c5e84fd7a9
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOBulkOperation.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.io;
+
+/**
+ * A descriptor for a bulk operation
+ */
+public interface IIOBulkOperation {
+ /**
+ * Add file reference to perform the required operation (e.g., delete)
+ *
+ * @param fileReference that should be included in this bulk operation
+ * @see IIOManager#performBulkOperation(IIOBulkOperation)
+ * @see IIOManager#createDeleteBulkOperation()
+ */
+ void add(FileReference fileReference);
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
index 95dcb2f7a3..a6520c63ce 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/io/IIOManager.java
@@ -21,7 +21,6 @@ package org.apache.hyracks.api.io;
import java.io.Closeable;
import java.io.FilenameFilter;
import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.WritableByteChannel;
import java.util.List;
import java.util.Set;
@@ -41,40 +40,136 @@ public interface IIOManager extends Closeable {
METADATA_ASYNC_DATA_ASYNC
}
+ /**
+ * @return IO devices in an NC
+ */
List<IODeviceHandle> getIODevices();
+ /**
+ * Open file
+ *
+ * @param fileRef file reference
+ * @param rwMode r/w mode
+ * @param syncMode sync mode
+ * @return file handle for the opened file
+ */
IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode,
FileSyncMode syncMode)
throws HyracksDataException;
+ /**
+ * Do sync write (utilizes {@link IAsyncRequest})
+ *
+ * @param fHandle handle of the opened file
+ * @param offset start offset
+ * @param data buffer to write
+ * @return number of written bytes
+ */
int syncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws
HyracksDataException;
+ /**
+ * Do sync write (utilizes {@link IAsyncRequest})
+ *
+ * @param fHandle file handle of the opened file
+ * @param offset start offset
+ * @param dataArray buffers to write
+ * @return number of written bytes
+ */
long syncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray)
throws HyracksDataException;
+ /**
+ * Do sync read (utilizes {@link IAsyncRequest})
+ *
+ * @param fHandle handle of the opened file
+ * @param offset start offset
+ * @param data destination buffer for the requested content
+ * @return number of read bytes
+ */
int syncRead(IFileHandle fHandle, long offset, ByteBuffer data) throws
HyracksDataException;
+ /**
+ * Do async write
+ *
+ * @param fHandle handle of the opened file
+ * @param offset start offset
+ * @param data buffer to write
+ * @return IAsyncRequest which allows to wait for the write request
+ */
IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer
data) throws HyracksDataException;
+ /**
+ * Do async write
+ *
+ * @param fHandle handle of the opened file
+ * @param offset start offset
+ * @param dataArray buffers to write
+ * @return IAsyncRequest which allows to wait for the write request
+ */
IAsyncRequest asyncWrite(IFileHandle fHandle, long offset, ByteBuffer[]
dataArray) throws HyracksDataException;
+ /**
+ * Do async read
+ *
+ * @param fHandle handle of the opened file
+ * @param offset start offset
+ * @param data destination buffer for the requested content
+ * @return IAsyncRequest which allows to wait for the read request
+ */
IAsyncRequest asyncRead(IFileHandle fHandle, long offset, ByteBuffer data)
throws HyracksDataException;
+ /**
+ * Close file
+ *
+ * @param fHandle handle of the opened file
+ */
void close(IFileHandle fHandle) throws HyracksDataException;
+ /**
+ * Forces the content of the opened file
+ *
+ * @param fileHandle handle of the opened file
+ * @param metadata force the file metadata as well
+ */
void sync(IFileHandle fileHandle, boolean metadata) throws
HyracksDataException;
+ /**
+ * Truncates the opened file to the requested size
+ *
+ * @param fileHandle handle of the opened file
+ * @param size required size
+ */
void truncate(IFileHandle fileHandle, long size) throws
HyracksDataException;
+ /**
+ * Gets the size of an opened file
+ *
+ * @param fileHandle handle of the opened file
+ * @return file size
+ */
long getSize(IFileHandle fileHandle) throws HyracksDataException;
+ /**
+ * Gets the size of a file
+ *
+ * @param fileReference file reference
+ * @return file size
+ */
long getSize(FileReference fileReference) throws HyracksDataException;
+ /**
+ * Returns a new write channel
+ *
+ * @param fileHandle handle of the opened file
+ * @return a new write channel
+ */
WritableByteChannel newWritableChannel(IFileHandle fileHandle);
void deleteWorkspaceFiles() throws HyracksDataException;
/**
- * @param ioDeviceId
- * @param path
+ * Returns a file reference of a file
+ *
+ * @param ioDeviceId device Id
+ * @param path relative path
* @return A file reference based on the mounting point of {@code
ioDeviceId} and the passed {@code relativePath}
*/
FileReference getFileReference(int ioDeviceId, String path);
@@ -82,18 +177,16 @@ public interface IIOManager extends Closeable {
/**
* determine which IO device holds the path and returns a FileReference
based on that
*
- * @param path
+ * @param path relative path
* @return A file reference based on the mounting point of {@code
ioDeviceId} and the passed {@code Path}
- * @throws HyracksDataException
*/
FileReference resolve(String path) throws HyracksDataException;
/**
* Gets a file reference from an absolute path
*
- * @param path
+ * @param path absolute path
* @return A file reference based on the mounting point of {@code
ioDeviceId} and the passed {@code relativePath}
- * @throws HyracksDataException
* @deprecated use getFileRef(int ioDeviceId, String path) instead
*/
@Deprecated
@@ -102,9 +195,8 @@ public interface IIOManager extends Closeable {
/**
* Create a workspace file with the given prefix
*
- * @param prefix
+ * @param prefix of workspace file
* @return A FileReference for the created workspace file
- * @throws HyracksDataException
*/
FileReference createWorkspaceFile(String prefix) throws
HyracksDataException;
@@ -118,36 +210,90 @@ public interface IIOManager extends Closeable {
/**
* Delete any additional artifacts associated with the file reference
*
- * @param fileRef
+ * @param fileRef file/directory to delete
*/
void delete(FileReference fileRef) throws HyracksDataException;
+ /**
+ * @return bulk-delete operation (for either file or directories)
+ */
+ IIOBulkOperation createDeleteBulkOperation();
+
+ /**
+ * List of files in the directory and its subdirectories
+ *
+ * @param dir directory to list
+ * @return a set of all files in the directory and its subdirectories
+ */
Set<FileReference> list(FileReference dir) throws HyracksDataException;
/**
- * Lists the files matching {@code filter} recursively starting from
{@code dir}
+ * List of files in the directory and its subdirectories that satisfy the
provided filter
*
- * @param dir
- * @param filter
- * @return the matching files
- * @throws HyracksDataException
+ * @param dir directory to list
+ * @param filter to test if a file reference satisfies a certain predicate
+ * @return a set of all files in the directory and its subdirectories that
satisfies the provided filter
*/
Set<FileReference> list(FileReference dir, FilenameFilter filter) throws
HyracksDataException;
- void overwrite(FileReference fileRef, byte[] bytes) throws
ClosedByInterruptException, HyracksDataException;
+ /**
+ * Overwrites (or write) a file
+ *
+ * @param fileRef file reference of the file to overwrite
+ * @param bytes content
+ */
+ void overwrite(FileReference fileRef, byte[] bytes) throws
HyracksDataException;
+ /**
+ * Reads the entire content of a file
+ *
+ * @param fileRef file reference
+ * @return a byte array of the content
+ */
byte[] readAllBytes(FileReference fileRef) throws HyracksDataException;
- void copyDirectory(FileReference srcMetadataScopePath, FileReference
targetMetadataScopePath)
- throws HyracksDataException;
-
- void deleteDirectory(FileReference root) throws HyracksDataException;
+ /**
+ * Copy the content of one directory to another
+ *
+ * @param srcDir source directory
+ * @param destDir destination directory
+ */
+ void copyDirectory(FileReference srcDir, FileReference destDir) throws
HyracksDataException;
+ /**
+ * Checks whether a file exists
+ *
+ * @param fileRef file reference
+ * @return true if the file exists, false otherwise
+ */
boolean exists(FileReference fileRef) throws HyracksDataException;
+ /**
+ * Creates a file
+ *
+ * @param fileRef file reference
+ */
void create(FileReference fileRef) throws HyracksDataException;
- boolean makeDirectories(FileReference resourceDir);
+ /**
+ * Make a directory and all of its parent directories
+ *
+ * @param dir directory to create
+ * @return true of it was created
+ */
+ boolean makeDirectories(FileReference dir);
+
+ /**
+ * Deletes the content of a directory (files and subdirectories)
+ *
+ * @param dir directory reference
+ */
+ void cleanDirectory(FileReference dir) throws HyracksDataException;
- void cleanDirectory(FileReference resourceDir) throws HyracksDataException;
+ /**
+ * Performs a bulk operation
+ *
+ * @param bulkOperation the operation to perform
+ */
+ void performBulkOperation(IIOBulkOperation bulkOperation) throws
HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
index 0e6578fd66..25a67ff0e7 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/IOManager.java
@@ -47,11 +47,14 @@ import
org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileDeviceResolver;
import org.apache.hyracks.api.io.IFileHandle;
+import org.apache.hyracks.api.io.IIOBulkOperation;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.IODeviceHandle;
import org.apache.hyracks.api.util.InvokeUtil;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IoRequest.State;
+import org.apache.hyracks.control.nc.io.bulk.AbstractBulkOperation;
+import org.apache.hyracks.control.nc.io.bulk.DeleteBulkOperation;
import org.apache.hyracks.util.file.FileUtil;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -506,6 +509,11 @@ public class IOManager implements IIOManager {
}
}
+ @Override
+ public IIOBulkOperation createDeleteBulkOperation() {
+ return new DeleteBulkOperation(this);
+ }
+
@Override
public Set<FileReference> list(FileReference dir) throws
HyracksDataException {
return list(dir, IoUtil.NO_OP_FILTER);
@@ -551,15 +559,6 @@ public class IOManager implements IIOManager {
}
}
- @Override
- public void deleteDirectory(FileReference root) throws
HyracksDataException {
- try {
- FileUtils.deleteDirectory(root.getFile());
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
-
@Override
public boolean exists(FileReference fileRef) throws HyracksDataException {
return fileRef.getFile().exists();
@@ -592,4 +591,9 @@ public class IOManager implements IIOManager {
throw HyracksDataException.create(e);
}
}
+
+ @Override
+ public void performBulkOperation(IIOBulkOperation bulkOperation) throws
HyracksDataException {
+ ((AbstractBulkOperation) bulkOperation).performOperation();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
new file mode 100644
index 0000000000..4ca7b9c77d
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/AbstractBulkOperation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.bulk;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOBulkOperation;
+import org.apache.hyracks.api.io.IIOManager;
+
+public abstract class AbstractBulkOperation implements IIOBulkOperation {
+ protected final IIOManager ioManager;
+ protected final List<FileReference> fileReferences;
+
+ AbstractBulkOperation(IIOManager ioManager) {
+ this.ioManager = ioManager;
+ fileReferences = new ArrayList<>();
+ }
+
+ @Override
+ public final void add(FileReference fileReference) {
+ fileReferences.add(fileReference);
+ }
+
+ public abstract void performOperation() throws HyracksDataException;
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
new file mode 100644
index 0000000000..5ccfdd6073
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/io/bulk/DeleteBulkOperation.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.control.nc.io.bulk;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class DeleteBulkOperation extends AbstractBulkOperation {
+ public DeleteBulkOperation(IIOManager ioManager) {
+ super(ioManager);
+ }
+
+ @Override
+ public void performOperation() throws HyracksDataException {
+ for (FileReference fileReference : fileReferences) {
+ ioManager.delete(fileReference);
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
index 6194854bbc..cff54a0980 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndexFileManager.java
@@ -179,7 +179,7 @@ public abstract class AbstractLSMIndexFileManager
implements ILSMIndexFileManage
@Override
public void deleteDirs() throws HyracksDataException {
- ioManager.deleteDirectory(baseDir);
+ ioManager.delete(baseDir);
}
@Override