steveloughran commented on a change in pull request #843: HADOOP-15183 S3Guard 
store becomes inconsistent after partial failure of rename
URL: https://github.com/apache/hadoop/pull/843#discussion_r292661204
 
 

 ##########
 File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 ##########
 @@ -1225,130 +1301,292 @@ private boolean innerRename(Path source, Path dest)
       }
     }
 
-    // If we have a MetadataStore, track deletions/creations.
-    Collection<Path> srcPaths = null;
-    List<PathMetadata> dstMetas = null;
-    if (hasMetadataStore()) {
-      srcPaths = new HashSet<>(); // srcPaths need fast look up before put
-      dstMetas = new ArrayList<>();
-    }
-    // TODO S3Guard HADOOP-13761: retries when source paths are not visible yet
+    // Validation completed: time to begin the operation.
+    // The store-specific rename operation is used to keep the store
+    // to date with the in-progress operation.
+    // for the null store, these are all no-ops.
+    final RenameTracker renameTracker =
+        metadataStore.initiateRenameOperation(
+            createStoreContext(),
+            src, srcStatus, dest);
+    final AtomicLong bytesCopied = new AtomicLong();
+    int renameParallelLimit = RENAME_PARALLEL_LIMIT;
+    final List<CompletableFuture<Path>> activeCopies =
+        new ArrayList<>(renameParallelLimit);
+    // aggregate operation to wait for the copies to complete then reset
+    // the list.
+    final FunctionsRaisingIOE.FunctionRaisingIOE<String, Void>
+        completeActiveCopies = (String reason) -> {
+          LOG.debug("Waiting for {} active copies to complete: {}",
+              activeCopies.size(), reason);
+          waitForCompletion(activeCopies);
+          activeCopies.clear();
+          return null;
+        };
+
     // TODO S3Guard: performance: mark destination dirs as authoritative
 
     // Ok! Time to start
-    if (srcStatus.isFile()) {
-      LOG.debug("rename: renaming file {} to {}", src, dst);
-      long length = srcStatus.getLen();
-      S3ObjectAttributes objectAttributes =
-          createObjectAttributes(srcStatus.getPath(),
-              srcStatus.getETag(), srcStatus.getVersionId());
-      S3AReadOpContext readContext = createReadContext(srcStatus, inputPolicy,
-          changeDetectionPolicy, readAhead);
-      if (dstStatus != null && dstStatus.isDirectory()) {
-        String newDstKey = maybeAddTrailingSlash(dstKey);
-        String filename =
-            srcKey.substring(pathToKey(src.getParent()).length()+1);
-        newDstKey = newDstKey + filename;
-        CopyResult copyResult = copyFile(srcKey, newDstKey, length,
-            objectAttributes, readContext);
-        S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src,
-            keyToQualifiedPath(newDstKey), length, getDefaultBlockSize(dst),
-            username, copyResult.getETag(), copyResult.getVersionId());
+    try {
+      if (srcStatus.isFile()) {
+        // the source is a file.
+        Path copyDestinationPath = dst;
+        String copyDestinationKey = dstKey;
+        S3ObjectAttributes sourceAttributes =
+            createObjectAttributes(srcStatus);
+        S3AReadOpContext readContext = createReadContext(srcStatus, 
inputPolicy,
+            changeDetectionPolicy, readAhead);
+        if (dstStatus != null && dstStatus.isDirectory()) {
+          // destination is a directory: build the final destination underneath
+          String newDstKey = maybeAddTrailingSlash(dstKey);
+          String filename =
+              srcKey.substring(pathToKey(src.getParent()).length() + 1);
+          newDstKey = newDstKey + filename;
+          copyDestinationKey = newDstKey;
+          copyDestinationPath = keyToQualifiedPath(newDstKey);
+        }
+        // destination either does not exist or is a file to overwrite.
+        LOG.debug("rename: renaming file {} to {}", src, copyDestinationPath);
+        copySourceAndUpdateTracker(renameTracker,
+            src,
+            srcKey,
+            sourceAttributes,
+            readContext,
+            copyDestinationPath,
+            copyDestinationKey,
+            false);
+        bytesCopied.addAndGet(srcStatus.getLen());
+         // delete the source
+        deleteObjectAtPath(src, srcKey, true);
+        // and update the tracker
+        renameTracker.sourceObjectsDeleted(Lists.newArrayList(src));
       } else {
-        CopyResult copyResult = copyFile(srcKey, dstKey, srcStatus.getLen(),
-            objectAttributes, readContext);
-        S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, src, dst,
-            length, getDefaultBlockSize(dst), username,
-            copyResult.getETag(), copyResult.getVersionId());
-      }
-      innerDelete(srcStatus, false);
-    } else {
-      LOG.debug("rename: renaming directory {} to {}", src, dst);
-
-      // This is a directory to directory copy
-      dstKey = maybeAddTrailingSlash(dstKey);
-      srcKey = maybeAddTrailingSlash(srcKey);
+        LOG.debug("rename: renaming directory {} to {}", src, dst);
 
-      //Verify dest is not a child of the source directory
-      if (dstKey.startsWith(srcKey)) {
-        throw new RenameFailedException(srcKey, dstKey,
-            "cannot rename a directory to a subdirectory of itself ");
-      }
+        // This is a directory-to-directory copy
+        dstKey = maybeAddTrailingSlash(dstKey);
+        srcKey = maybeAddTrailingSlash(srcKey);
 
-      List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
-      if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) {
-        // delete unnecessary fake directory.
-        keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
-      }
+        //Verify dest is not a child of the source directory
+        if (dstKey.startsWith(srcKey)) {
+          throw new RenameFailedException(srcKey, dstKey,
+              "cannot rename a directory to a subdirectory of itself ");
+        }
 
-      Path parentPath = keyToQualifiedPath(srcKey);
-      RemoteIterator<S3ALocatedFileStatus> iterator =
-          listFilesAndEmptyDirectories(parentPath, true);
-      while (iterator.hasNext()) {
-        S3ALocatedFileStatus status = iterator.next();
-        long length = status.getLen();
-        String key = pathToKey(status.getPath());
-        if (status.isDirectory() && !key.endsWith("/")) {
-          key += "/";
+        // These are the lists of keys to delete and of their paths, the
+        // latter being used to update the rename tracker.
+        final List<DeleteObjectsRequest.KeyVersion> keysToDelete =
+            new ArrayList<>();
+        final List<Path> pathsToDelete = new ArrayList<>();
+        // to update the lists of keys and paths.
+        final BiFunction<Path, String, Void> queueToDelete =
+            (Path path, String key) -> {
+              pathsToDelete.add(path);
+              keysToDelete.add(new DeleteObjectsRequest.KeyVersion(key));
+              return null;
+            };
+
+        // a lambda-expression to block waiting for ay active copies to finish
+        // then delete all queued keys + paths to delete.
+        final FunctionsRaisingIOE.FunctionRaisingIOE<String, Void>
+            completeActiveCopiesAndDeleteSources =
+                (String reason) -> {
+                  completeActiveCopies.apply(reason);
+                  removeSourceObjects(renameTracker,
+                      keysToDelete,
+                      pathsToDelete);
+                  // now reset the lists.
+                  keysToDelete.clear();
+                  pathsToDelete.clear();
+                  return null;
+                };
+
+        if (dstStatus != null
+            && dstStatus.isEmptyDirectory() == Tristate.TRUE) {
+          // delete unnecessary fake directory at the destination.
+          // this MUST be done before anything else so that
+          // rollback code doesn't get confused and insert a tombstone
+          // marker.
+          deleteObjectAtPath(dstStatus.getPath(), dstKey, false);
         }
-        keysToDelete
-            .add(new DeleteObjectsRequest.KeyVersion(key));
-        String newDstKey =
-            dstKey + key.substring(srcKey.length());
-        S3ObjectAttributes objectAttributes =
-            createObjectAttributes(status.getPath(),
-                status.getETag(), status.getVersionId());
-        S3AReadOpContext readContext = createReadContext(status, inputPolicy,
-            changeDetectionPolicy, readAhead);
-        CopyResult copyResult = copyFile(key, newDstKey, length,
-            objectAttributes, readContext);
-
-        if (hasMetadataStore()) {
-          // with a metadata store, the object entries need to be updated,
-          // including, potentially, the ancestors
-          Path childSrc = keyToQualifiedPath(key);
-          Path childDst = keyToQualifiedPath(newDstKey);
-          if (objectRepresentsDirectory(key, length)) {
-            S3Guard.addMoveDir(metadataStore, srcPaths, dstMetas, childSrc,
-                childDst, username);
-          } else {
-            S3Guard.addMoveFile(metadataStore, srcPaths, dstMetas, childSrc,
-                childDst, length, getDefaultBlockSize(childDst), username,
-                copyResult.getETag(), copyResult.getVersionId());
+
+        Path parentPath = keyToQualifiedPath(srcKey);
+        final RemoteIterator<S3ALocatedFileStatus> iterator =
+            listFilesAndEmptyDirectories(parentPath, true);
+        while (iterator.hasNext()) {
+          S3ALocatedFileStatus status = iterator.next();
+          String k = pathToKey(status.getPath());
+          String key = (status.isDirectory() && !k.endsWith("/"))
+              ? k + "/"
+              : k;
+          String newDstKey =
+              dstKey + key.substring(srcKey.length());
+          Path childSourcePath = keyToQualifiedPath(key);
+
+          queueToDelete.apply(childSourcePath, key);
+
+          Path childDestPath = keyToQualifiedPath(newDstKey);
+          S3ObjectAttributes sourceAttributes =
+              createObjectAttributes(
+                  status.getPath(),
+                  status.getETag(),
+                  status.getVersionId(),
+                  status.getLen());
+          S3AReadOpContext readContext = createReadContext(status, inputPolicy,
+              changeDetectionPolicy, readAhead);
+          // queue the copy operation for execution in the thread pool
+          CompletableFuture<Path> copy = submit(boundedThreadPool, () ->
+              copySourceAndUpdateTracker(
+                  renameTracker,
+                  childSourcePath,
+                  key,
+                  sourceAttributes,
+                  readContext,
+                  childDestPath,
+                  newDstKey,
+                  true));
+          bytesCopied.addAndGet(srcStatus.getLen());
+          activeCopies.add(copy);
+          if (activeCopies.size() == renameParallelLimit) {
+            // the limit of active copies has been reached;
+            // wait for completion or errors to surface.
+            LOG.debug("Waiting for active copies to complete");
+            completeActiveCopies.apply("batch threshold reached");
 
 Review comment:
   Gabor: This is my attempt to clean it up. The rename logic has all been 
pulled out of S3AFS into the trackers and I'm defining a couple of lambda 
expressions in the method to avoid copy and paste duplication and effort.
   
   The only way to do anything else would be to pull the entire code 
"somewhere", but because of all the calls into S3AFS, that will only make sense 
as part of a relayering of ops: we'd have this to work at the "object store + 
S3Guard layer", rather than through the FS APIs. That's a major change and not 
going to happen in a patch which is big enough as it is.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to