steveloughran commented on a change in pull request #1359: HADOOP-16430.S3AFilesystem.delete to incrementally update s3guard with deletions URL: https://github.com/apache/hadoop/pull/1359#discussion_r319117008
########## File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/DeleteOperation.java ########## @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsResult; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListeningExecutorService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3AFileStatus; +import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus; +import org.apache.hadoop.fs.s3a.Tristate; +import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; +import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.S3Guard; +import org.apache.hadoop.util.DurationInfo; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; +import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; + +/** + * Implementation of the delete operation. + * For an authoritative S3Guarded store, after the list and delete of the + * combined store, we repeat against raw S3. + * This will correct for any situation where the authoritative listing is + * incomplete. + */ +public class DeleteOperation extends AbstractStoreOperation { + + private static final Logger LOG = LoggerFactory.getLogger( + DeleteOperation.class); + + /** + * This is a switch to turn on when trying to debug + * deletion problems; it requests the results of + * the delete call from AWS then audits them. + */ + private static final boolean AUDIT_DELETED_KEYS = true; + + /** + * Used to stop any re-entrancy of the rename. + * This is an execute-once operation. + */ + private final AtomicBoolean executed = new AtomicBoolean(false); + + private final S3AFileStatus status; + + private final boolean recursive; + + private final OperationCallbacks callbacks; + + private final int pageSize; + + private final MetadataStore metadataStore; + + private final ListeningExecutorService executor; + + private List<DeleteObjectsRequest.KeyVersion> keys; + + private List<Path> paths; + + private CompletableFuture<Void> deleteFuture; + + private long filesDeleted; + private long extraFilesDeleted; + + /** + * Constructor. + * @param context store context + * @param status pre-fetched source status + * @param recursive recursive delete? + * @param callbacks callback provider + * @param pageSize number of entries in a page + */ + public DeleteOperation(final StoreContext context, + final S3AFileStatus status, + final boolean recursive, + final OperationCallbacks callbacks, int pageSize) { + + super(context); + this.status = status; + this.recursive = recursive; + this.callbacks = callbacks; + checkArgument(pageSize > 0 + && pageSize <=InternalConstants.MAX_ENTRIES_TO_DELETE, + "page size out of range: %d", pageSize); + this.pageSize = pageSize; + metadataStore = context.getMetadataStore(); + executor = context.createThrottledExecutor(2); + } + + public long getFilesDeleted() { + return filesDeleted; + } + + public long getExtraFilesDeleted() { + return extraFilesDeleted; + } + + /** + * Delete a file or directory tree. + * This call does not create any fake parent directory; that is + * left to the caller. + * The actual delete call is done in a separate thread. + * Only one delete at a time is submitted, however, to reduce the + * complexity of recovering from failures. + * <p> + * The DynamoDB store deletes paths in parallel itself, so that + * potentially slow part of the process is somewhat speeded up. + * The extra parallelization here is to list files from the store/DDB while + * that delete operation is in progress. + * <p> + * Note that DDB is not used for listing objects here, even if the + * store is marked as auth: that actually means that newly created files + * may not get found for the delete. + * + * @return true, except in the corner cases of root directory deletion + * @throws PathIsNotEmptyDirectoryException if the path is a dir and this + * is not a recursive delete. + * @throws IOException list failures or an inability to delete a file. + */ + @Retries.RetryTranslated + public boolean execute() throws IOException { + Preconditions.checkState( + !executed.getAndSet(true), + "delete attempted twice"); + StoreContext context = getStoreContext(); + Path path = status.getPath(); + LOG.debug("Delete path {} - recursive {}", path, recursive); + LOG.debug("Type = {}", + status.isFile() ? "File" + : (status.isEmptyDirectory() == Tristate.TRUE + ? "Empty Directory" + : "Directory")); + + String key = context.pathToKey(path); + if (status.isDirectory()) { + LOG.debug("delete: Path is a directory: {}", path); + checkArgument( + status.isEmptyDirectory() != Tristate.UNKNOWN, + "File status must have directory emptiness computed"); + + if (!key.endsWith("/")) { + key = key + "/"; + } + + if ("/".equals(key)) { + LOG.error("S3A: Cannot delete the root directory." + + " Path: {}. Recursive: {}", + status.getPath(), recursive); + return false; + } + + if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) { + throw new PathIsNotEmptyDirectoryException(path.toString()); + } + if (status.isEmptyDirectory() == Tristate.TRUE) { + LOG.debug("deleting empty directory {}", path); + deleteObjectAtPath(path, key, false); + } else { + deleteDirectory(path, key); + } + + } else { + // simple file. + LOG.debug("deleting simple file {}", path); + deleteObjectAtPath(path, key, true); + } + LOG.debug("Deleted {} files", filesDeleted); + return true; + } + + /** + * Directory delete: combine paginated list of files with single or + * multiple object delete calls. + * + * @param path directory path + * @param dirKey directory key + * @throws IOException failure + */ + protected void deleteDirectory(final Path path, + final String dirKey) throws IOException { + // create an operation state so that the store can manage the bulk + // operation if it needs to + try (BulkOperationState operationState = + S3Guard.initiateBulkWrite( + metadataStore, + BulkOperationState.OperationType.Delete, + path); + DurationInfo ignored = + new DurationInfo(LOG, false, "deleting %s", dirKey)) { + + // init the lists of keys and paths to delete + resetDeleteList(); + deleteFuture = null; + + // list files including any under tombstones through S3Guard + LOG.debug("Getting objects for directory prefix {} to delete", dirKey); + final RemoteIterator<S3ALocatedFileStatus> locatedFiles = + callbacks.listFilesAndEmptyDirectories(path, status, false, true); + + // iterate through and delete. The next() call will block when a new S3 + // page is required; this any active delete submitted to the executor + // will run in parallel with this. + while (locatedFiles.hasNext()) { + // get the next entry in the listing. + S3AFileStatus child = locatedFiles.next().toS3AFileStatus(); + queueForDeletion(operationState, child); + } + LOG.debug("Deleting final batch of listed files"); + deleteNextBatch(operationState); + maybeAwaitCompletion(deleteFuture); + + // if s3guard is authoritative we follow up with a bulk list and + // delete process on S3 this helps recover from any situation where S3 + // and S3Guard have become inconsistent. + // This is only needed for auth paths; by performing the previous listing + // without tombstone filtering, any files returned by the non-auth Review comment: yes, we may as well skip tombstones..the cost is possibly some duplicate DELETE calls on files already deleted, but with bulk deletes that's a low cost. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
