[ 
https://issues.apache.org/jira/browse/HADOOP-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16164660#comment-16164660
 ] 

ASF GitHub Bot commented on HADOOP-13600:
-----------------------------------------

Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/157#discussion_r138620461
  
    --- Diff: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 ---
    @@ -891,50 +902,123 @@ private boolean innerRename(Path source, Path dest)
           }
     
           List<DeleteObjectsRequest.KeyVersion> keysToDelete = new 
ArrayList<>();
    +      List<DeleteObjectsRequest.KeyVersion> dirKeysToDelete = new 
ArrayList<>();
           if (dstStatus != null && dstStatus.isEmptyDirectory() == 
Tristate.TRUE) {
             // delete unnecessary fake directory.
             keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
           }
     
    -      Path parentPath = keyToPath(srcKey);
    -      RemoteIterator<LocatedFileStatus> iterator = 
listFilesAndEmptyDirectories(
    -          parentPath, true);
    -      while (iterator.hasNext()) {
    -        LocatedFileStatus status = iterator.next();
    -        long length = status.getLen();
    -        String key = pathToKey(status.getPath());
    -        if (status.isDirectory() && !key.endsWith("/")) {
    -          key += "/";
    -        }
    -        keysToDelete
    -            .add(new DeleteObjectsRequest.KeyVersion(key));
    -        String newDstKey =
    -            dstKey + key.substring(srcKey.length());
    -        copyFile(key, newDstKey, length);
    -
    -        if (hasMetadataStore()) {
    -          // with a metadata store, the object entries need to be updated,
    -          // including, potentially, the ancestors
    -          Path childSrc = keyToQualifiedPath(key);
    -          Path childDst = keyToQualifiedPath(newDstKey);
    -          if (objectRepresentsDirectory(key, length)) {
    -            S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
    -                childDst, username);
    +      // A blocking queue that tracks all objects that need to be deleted
    +      BlockingQueue<Optional<DeleteObjectsRequest.KeyVersion>> deleteQueue 
= new ArrayBlockingQueue<>(
    +              (int) Math.round(MAX_ENTRIES_TO_DELETE * 1.5));
    +
    +      // Used to track if the delete thread was gracefully shutdown
    +      boolean deleteFutureComplete = false;
    +      FutureTask<Void> deleteFuture = null;
    +
    +      try {
    +        // Launch a thread that will read from the deleteQueue and batch 
delete any files that have already been copied
    +        deleteFuture = new FutureTask<>(() -> {
    +          while (true) {
    +            while (keysToDelete.size() < MAX_ENTRIES_TO_DELETE) {
    +              Optional<DeleteObjectsRequest.KeyVersion> key = 
deleteQueue.take();
    +
    +              // The thread runs until is is given an EOF message (an 
Optional#empty())
    +              if (key.isPresent()) {
    +                keysToDelete.add(key.get());
    +              } else {
    +
    +                // Delete any remaining keys and exit
    +                removeKeys(keysToDelete, true, false);
    +                return null;
    +              }
    +            }
    +            removeKeys(keysToDelete, true, false);
    +          }
    +        });
    +
    +        Thread deleteThread = new Thread(deleteFuture);
    +        deleteThread.setName("s3a-rename-delete-thread");
    +        deleteThread.start();
    +
    +        // Used to abort future copy tasks as soon as one copy task fails
    +        AtomicBoolean copyFailure = new AtomicBoolean(false);
    +        List<CopyContext> copies = new ArrayList<>();
    +
    +        Path parentPath = keyToPath(srcKey);
    +        RemoteIterator<LocatedFileStatus> iterator = 
listFilesAndEmptyDirectories(
    +                parentPath, true);
    +        while (iterator.hasNext()) {
    +          LocatedFileStatus status = iterator.next();
    +          long length = status.getLen();
    +          String key = pathToKey(status.getPath());
    +          if (status.isDirectory() && !key.endsWith("/")) {
    +            key += "/";
    +          }
    +          if (status.isDirectory()) {
    +            dirKeysToDelete.add(new DeleteObjectsRequest.KeyVersion(key));
    +          }
    +          String newDstKey =
    +                  dstKey + key.substring(srcKey.length());
    +
    +          // If no previous file hit a copy failure, copy this file
    +          if (!copyFailure.get()) {
    +            copies.add(new CopyContext(copyFileAsync(key, newDstKey,
    +                    new RenameProgressListener(this, srcStatus, 
status.isDirectory() ? null :
    +                            new DeleteObjectsRequest.KeyVersion(key), 
deleteQueue, copyFailure)),
    +                    key, newDstKey, length));
               } else {
    -            S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, 
childSrc,
    -                childDst, length, getDefaultBlockSize(childDst), username);
    +            // We got a copy failure, so don't bother going through the 
rest of the files
    +            break;
               }
    -          // Ancestor directories may not be listed, so we explicitly add 
them
    -          S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
    -              keyToQualifiedPath(srcKey), childSrc, childDst, username);
             }
     
    -        if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
    -          removeKeys(keysToDelete, true, false);
    +        for (CopyContext copyContext : copies) {
    +          try {
    +            copyContext.getCopy().waitForCopyResult();
    +          } catch (InterruptedException e) {
    +            throw new RenameFailedException(copyContext.getSrcKey(), 
copyContext.getDstKey(), e);
    +          }
    +
    +          if (hasMetadataStore()) {
    +            // with a metadata store, the object entries need to be 
updated,
    +            // including, potentially, the ancestors
    +            Path childSrc = keyToQualifiedPath(copyContext.getSrcKey());
    +            Path childDst = keyToQualifiedPath(copyContext.getDstKey());
    +            if (objectRepresentsDirectory(copyContext.getSrcKey(), 
copyContext.getLength())) {
    +              S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, 
childSrc,
    +                      childDst, username);
    +            } else {
    +              S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, 
childSrc,
    +                      childDst, copyContext.getLength(), 
getDefaultBlockSize(childDst), username);
    +            }
    +            // Ancestor directories may not be listed, so we explicitly 
add them
    +            S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas,
    +                    keyToQualifiedPath(srcKey), childSrc, childDst, 
username);
    +          }
    +        }
    +
    +        if (copyFailure.get()) {
    +          throw new RenameFailedException(srcKey, dstKey,
    +                  new IllegalStateException("Progress listener indicated a 
copy failure, but no exception was thrown"));
    +        }
    +
    +        try {
    +          for (DeleteObjectsRequest.KeyVersion dirKey : dirKeysToDelete) {
    +            deleteQueue.put(Optional.of(dirKey));
    +          }
    +          deleteQueue.put(Optional.empty());
    +          deleteFuture.get();
    +        } catch (ExecutionException | InterruptedException e) {
    +          throw new RenameFailedException(srcKey, dstKey, e);
    +        }
    +        deleteFutureComplete = true;
    +      } finally {
    +        if (!deleteFutureComplete) {
    +          if (deleteFuture != null && !deleteFuture.isDone() && 
!deleteFuture.isCancelled()) {
    +            deleteFuture.cancel(true);
    +          }
    --- End diff --
    
    mmm. I mostly concur, handing in either the S3aFS or the (expanded) 
WriteOperationsHelper


> S3a rename() to copy files in a directory in parallel
> -----------------------------------------------------
>
>                 Key: HADOOP-13600
>                 URL: https://issues.apache.org/jira/browse/HADOOP-13600
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 2.7.3
>            Reporter: Steve Loughran
>            Assignee: Sahil Takiar
>         Attachments: HADOOP-13600.001.patch
>
>
> Currently a directory rename does a one-by-one copy, making the request 
> O(files * data). If the copy operations were launched in parallel, the 
> duration of the copy may be reducable to the duration of the longest copy. 
> For a directory with many files, this will be significant



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to