[ https://issues.apache.org/jira/browse/HADOOP-13600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16164660#comment-16164660 ]
ASF GitHub Bot commented on HADOOP-13600: ----------------------------------------- Github user steveloughran commented on a diff in the pull request: https://github.com/apache/hadoop/pull/157#discussion_r138620461 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java --- @@ -891,50 +902,123 @@ private boolean innerRename(Path source, Path dest) } List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(); + List<DeleteObjectsRequest.KeyVersion> dirKeysToDelete = new ArrayList<>(); if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) { // delete unnecessary fake directory. keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey)); } - Path parentPath = keyToPath(srcKey); - RemoteIterator<LocatedFileStatus> iterator = listFilesAndEmptyDirectories( - parentPath, true); - while (iterator.hasNext()) { - LocatedFileStatus status = iterator.next(); - long length = status.getLen(); - String key = pathToKey(status.getPath()); - if (status.isDirectory() && !key.endsWith("/")) { - key += "/"; - } - keysToDelete - .add(new DeleteObjectsRequest.KeyVersion(key)); - String newDstKey = - dstKey + key.substring(srcKey.length()); - copyFile(key, newDstKey, length); - - if (hasMetadataStore()) { - // with a metadata store, the object entries need to be updated, - // including, potentially, the ancestors - Path childSrc = keyToQualifiedPath(key); - Path childDst = keyToQualifiedPath(newDstKey); - if (objectRepresentsDirectory(key, length)) { - S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc, - childDst, username); + // A blocking queue that tracks all objects that need to be deleted + BlockingQueue<Optional<DeleteObjectsRequest.KeyVersion>> deleteQueue = new ArrayBlockingQueue<>( + (int) Math.round(MAX_ENTRIES_TO_DELETE * 1.5)); + + // Used to track if the delete thread was gracefully shutdown + boolean deleteFutureComplete = false; + FutureTask<Void> deleteFuture = null; + + try { + // Launch a thread that will read from the deleteQueue and batch delete any files that have already been copied + deleteFuture = new FutureTask<>(() -> { + while (true) { + while (keysToDelete.size() < MAX_ENTRIES_TO_DELETE) { + Optional<DeleteObjectsRequest.KeyVersion> key = deleteQueue.take(); + + // The thread runs until is is given an EOF message (an Optional#empty()) + if (key.isPresent()) { + keysToDelete.add(key.get()); + } else { + + // Delete any remaining keys and exit + removeKeys(keysToDelete, true, false); + return null; + } + } + removeKeys(keysToDelete, true, false); + } + }); + + Thread deleteThread = new Thread(deleteFuture); + deleteThread.setName("s3a-rename-delete-thread"); + deleteThread.start(); + + // Used to abort future copy tasks as soon as one copy task fails + AtomicBoolean copyFailure = new AtomicBoolean(false); + List<CopyContext> copies = new ArrayList<>(); + + Path parentPath = keyToPath(srcKey); + RemoteIterator<LocatedFileStatus> iterator = listFilesAndEmptyDirectories( + parentPath, true); + while (iterator.hasNext()) { + LocatedFileStatus status = iterator.next(); + long length = status.getLen(); + String key = pathToKey(status.getPath()); + if (status.isDirectory() && !key.endsWith("/")) { + key += "/"; + } + if (status.isDirectory()) { + dirKeysToDelete.add(new DeleteObjectsRequest.KeyVersion(key)); + } + String newDstKey = + dstKey + key.substring(srcKey.length()); + + // If no previous file hit a copy failure, copy this file + if (!copyFailure.get()) { + copies.add(new CopyContext(copyFileAsync(key, newDstKey, + new RenameProgressListener(this, srcStatus, status.isDirectory() ? null : + new DeleteObjectsRequest.KeyVersion(key), deleteQueue, copyFailure)), + key, newDstKey, length)); } else { - S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc, - childDst, length, getDefaultBlockSize(childDst), username); + // We got a copy failure, so don't bother going through the rest of the files + break; } - // Ancestor directories may not be listed, so we explicitly add them - S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas, - keyToQualifiedPath(srcKey), childSrc, childDst, username); } - if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) { - removeKeys(keysToDelete, true, false); + for (CopyContext copyContext : copies) { + try { + copyContext.getCopy().waitForCopyResult(); + } catch (InterruptedException e) { + throw new RenameFailedException(copyContext.getSrcKey(), copyContext.getDstKey(), e); + } + + if (hasMetadataStore()) { + // with a metadata store, the object entries need to be updated, + // including, potentially, the ancestors + Path childSrc = keyToQualifiedPath(copyContext.getSrcKey()); + Path childDst = keyToQualifiedPath(copyContext.getDstKey()); + if (objectRepresentsDirectory(copyContext.getSrcKey(), copyContext.getLength())) { + S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc, + childDst, username); + } else { + S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc, + childDst, copyContext.getLength(), getDefaultBlockSize(childDst), username); + } + // Ancestor directories may not be listed, so we explicitly add them + S3Guard.addMoveAncestors(metadataStore, srcPaths, dstMetas, + keyToQualifiedPath(srcKey), childSrc, childDst, username); + } + } + + if (copyFailure.get()) { + throw new RenameFailedException(srcKey, dstKey, + new IllegalStateException("Progress listener indicated a copy failure, but no exception was thrown")); + } + + try { + for (DeleteObjectsRequest.KeyVersion dirKey : dirKeysToDelete) { + deleteQueue.put(Optional.of(dirKey)); + } + deleteQueue.put(Optional.empty()); + deleteFuture.get(); + } catch (ExecutionException | InterruptedException e) { + throw new RenameFailedException(srcKey, dstKey, e); + } + deleteFutureComplete = true; + } finally { + if (!deleteFutureComplete) { + if (deleteFuture != null && !deleteFuture.isDone() && !deleteFuture.isCancelled()) { + deleteFuture.cancel(true); + } --- End diff -- mmm. I mostly concur, handing in either the S3aFS or the (expanded) WriteOperationsHelper > S3a rename() to copy files in a directory in parallel > ----------------------------------------------------- > > Key: HADOOP-13600 > URL: https://issues.apache.org/jira/browse/HADOOP-13600 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 2.7.3 > Reporter: Steve Loughran > Assignee: Sahil Takiar > Attachments: HADOOP-13600.001.patch > > > Currently a directory rename does a one-by-one copy, making the request > O(files * data). If the copy operations were launched in parallel, the > duration of the copy may be reducable to the duration of the longest copy. > For a directory with many files, this will be significant -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org