anmolanmol1234 commented on code in PR #7265: URL: https://github.com/apache/hadoop/pull/7265#discussion_r1916372687
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java: ########## @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS_DESCRIPTION; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; + +/** + * Orchestrator for rename over Blob endpoint. Handles both directory and file + * renames. Blob Endpoint does not expose rename API, this class is responsible + * for copying the blobs and deleting the source blobs. + * <p> + * For directory rename, it recursively lists the blobs in the source directory and + * copies them to the destination directory. + */ +public class BlobRenameHandler extends ListActionTaker { + + public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); + + private final String srcEtag; + + private final Path src, dst; + + private final boolean isAtomicRename, isAtomicRenameRecovery; + + private final TracingContext tracingContext; + + private AbfsLease srcAbfsLease; + + private String srcLeaseId; + + private final List<AbfsLease> leases = new ArrayList<>(); + + private final AtomicInteger operatedBlobCount = new AtomicInteger(0); + + /** Constructor. + * + * @param src source path + * @param dst destination path + * @param abfsClient AbfsBlobClient to use for the rename operation + * @param srcEtag eTag of the source path + * @param isAtomicRename true if the rename operation is atomic + * @param isAtomicRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation + * @param tracingContext object of tracingContext used for the tracing of the server calls. + */ + public BlobRenameHandler(final String src, + final String dst, + final AbfsBlobClient abfsClient, + final String srcEtag, + final boolean isAtomicRename, + final boolean isAtomicRenameRecovery, + final TracingContext tracingContext) { + super(new Path(src), abfsClient, tracingContext); + this.srcEtag = srcEtag; + this.tracingContext = tracingContext; + this.src = new Path(src); + this.dst = new Path(dst); + this.isAtomicRename = isAtomicRename; + this.isAtomicRenameRecovery = isAtomicRenameRecovery; + } + + public BlobRenameHandler(final String src, + final String dst, + final AbfsBlobClient abfsClient, + final String srcEtag, + final boolean isAtomicRename, + final boolean isAtomicRenameRecovery, + final AbfsLease srcAbfsLease, + final TracingContext tracingContext) { + this(src, dst, abfsClient, srcEtag, isAtomicRename, isAtomicRenameRecovery, + tracingContext); + this.srcAbfsLease = srcAbfsLease; + } + + @Override + int getMaxConsumptionParallelism() { + return getAbfsClient().getAbfsConfiguration() + .getBlobRenameDirConsumptionParallelism(); + } + + /** + * Orchestrates the rename operation. + * + * @return AbfsClientRenameResult containing the result of the rename operation + * @throws AzureBlobFileSystemException if server call fails + */ + public boolean execute() throws AzureBlobFileSystemException { + PathInformation pathInformation = new PathInformation(); + boolean result = false; + if (preCheck(src, dst, pathInformation)) { + RenameAtomicity renameAtomicity = null; + if (pathInformation.getIsDirectory() + && pathInformation.getIsImplicit()) { + AbfsRestOperation createMarkerOp = getAbfsClient().createPath(src.toUri().getPath(), + false, false, null, + false, null, null, tracingContext); + pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult())); + } + try { + if (isAtomicRename) { + /* + * Conditionally get a lease on the source blob to prevent other writers + * from changing it. This is used for correctness in HBase when log files + * are renamed. When the HBase master renames a log file folder, the lease + * locks out other writers. This prevents a region server that the master + * thinks is dead, but is still alive, from committing additional updates. + * This is different than when HBase runs on HDFS, where the region server + * recovers the lease on a log file, to gain exclusive access to it, before + * it splits it. + */ + if (srcAbfsLease == null) { + srcAbfsLease = takeLease(src, srcEtag); + } + srcLeaseId = srcAbfsLease.getLeaseID(); + if (!isAtomicRenameRecovery && pathInformation.getIsDirectory()) { + /* + * if it is not a resume of a previous failed atomic rename operation, + * perform the pre-rename operation. + */ + renameAtomicity = getRenameAtomicity(pathInformation); + renameAtomicity.preRename(); + } + } + if (pathInformation.getIsDirectory()) { + result = listRecursiveAndTakeAction() && finalSrcRename(); + } else { + result = renameInternal(src, dst); + } + } finally { + if (srcAbfsLease != null) { + // If the operation is successful, cancel the timer and no need to release + // the lease as delete on the blob-path has taken place. + if (result) { + srcAbfsLease.cancelTimer(); + } else { + srcAbfsLease.free(); + } + } + } + if (result && renameAtomicity != null) { + renameAtomicity.postRename(); + } + } + return result; + } + + /** Final rename operation after all the blobs have been copied. + * + * @return true if rename is successful + * @throws AzureBlobFileSystemException if server call fails + */ + private boolean finalSrcRename() throws AzureBlobFileSystemException { + tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1); + try { + return renameInternal(src, dst); + } finally { + tracingContext.setOperatedBlobCount(null); + } + } + + /** Gets the rename atomicity object. + * + * @param pathInformation object containing the path information of the source path + * + * @return RenameAtomicity object + */ + @VisibleForTesting + public RenameAtomicity getRenameAtomicity(final PathInformation pathInformation) { + return new RenameAtomicity(src, + dst, + new Path(src.getParent(), src.getName() + RenameAtomicity.SUFFIX), + tracingContext, + pathInformation.getETag(), + getAbfsClient()); + } + + /** Takes a lease on the path. + * + * @param path path on which the lease is to be taken + * @param eTag eTag of the path + * + * @return object containing the lease information + * @throws AzureBlobFileSystemException if server call fails + */ + private AbfsLease takeLease(final Path path, final String eTag) + throws AzureBlobFileSystemException { + AbfsLease lease = new AbfsLease(getAbfsClient(), path.toUri().getPath(), false, + getAbfsClient().getAbfsConfiguration() + .getAtomicRenameLeaseRefreshDuration(), + eTag, tracingContext); + leases.add(lease); + return lease; + } + + /** Checks if the path contains a colon. + * + * @param p path to check + * + * @return true if the path contains a colon + */ + private boolean containsColon(Path p) { + return p.toUri().getPath().contains(":"); + } + + /** + * Since, server doesn't have a rename API and would not be able to check HDFS + * contracts, client would have to ensure that no HDFS contract is violated. + * + * @param src source path + * @param dst destination path + * @param pathInformation object in which path information of the source path would be stored + * + * @return true if the pre-checks pass + * @throws AzureBlobFileSystemException if server call fails or given paths are invalid. + */ + private boolean preCheck(final Path src, final Path dst, + final PathInformation pathInformation) + throws AzureBlobFileSystemException { + validateDestinationPath(src, dst); + + setSrcPathInformation(src, pathInformation); + validateSourcePath(pathInformation); + validateDestinationPathNotExist(src, dst, pathInformation); + validateDestinationParentExist(src, dst, pathInformation); + + return true; + } + + /** + * Validate if the format of the destination path is correct and if the destination + * path is not a sub-directory of the source path. + * + * @param src source path + * @param dst destination path + * + * @throws AbfsRestOperationException if the destination path is invalid + */ + private void validateDestinationPath(final Path src, final Path dst) + throws AbfsRestOperationException { + if (containsColon(dst)) { + throw new AbfsRestOperationException( + HttpURLConnection.HTTP_BAD_REQUEST, + AzureServiceErrorCode.INVALID_RENAME_DESTINATION.getErrorCode(), null, + new PathIOException(dst.toUri().getPath(), + "Destination path contains colon")); + } + + validateDestinationIsNotSubDir(src, dst); + } + + /** + * Validate if the destination path is not a sub-directory of the source path. + * + * @param src source path + * @param dst destination path + */ + private void validateDestinationIsNotSubDir(final Path src, + final Path dst) throws AbfsRestOperationException { + LOG.debug("Check if the destination is subDirectory"); + Path nestedDstParent = dst.getParent(); + if (nestedDstParent != null && nestedDstParent.toUri() + .getPath() + .indexOf(src.toUri().getPath()) == 0) { + LOG.info("Rename src: {} dst: {} failed as dst is subDir of src", + src, dst); + throw new AbfsRestOperationException(HttpURLConnection.HTTP_CONFLICT, + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode(), + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorMessage(), + new Exception( + AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH.getErrorCode())); + } + } + + /** Set the path information of the source path. + * + * @param src source path + * @param pathInformation object containing the path information of the source path + * + * @throws AzureBlobFileSystemException if server call fails + */ + private void setSrcPathInformation(final Path src, + final PathInformation pathInformation) + throws AzureBlobFileSystemException { + pathInformation.copy(getPathInformation(src, tracingContext)); + } + + /** + * Validate if the source path exists and if the client knows the ETag of the source path, + * then the ETag should match with the server. + * + * @param pathInformation object containing the path information of the source path + * + * @throws AbfsRestOperationException if the source path is not found or if the ETag of the source + * path does not match with the server. + */ + private void validateSourcePath(final PathInformation pathInformation) + throws AzureBlobFileSystemException { + if (!pathInformation.getPathExists()) { + throw new AbfsRestOperationException( + HttpURLConnection.HTTP_NOT_FOUND, + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode(), null, + new Exception( + AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND.getErrorCode())); + } + if (srcEtag != null && !srcEtag.equals(pathInformation.getETag())) { + throw new AbfsRestOperationException( + HttpURLConnection.HTTP_CONFLICT, + AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null, + new Exception( + AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode())); + } + } + + /** Validate if the destination path does not exist. + * + * @param src source path + * @param dst destination path + * @param pathInformation object containing the path information of the source path + * + * @throws AbfsRestOperationException if the destination path already exists + */ + private void validateDestinationPathNotExist(final Path src, + final Path dst, + final PathInformation pathInformation) + throws AzureBlobFileSystemException { + /* + * Destination path name can be same to that of source path name only in the + * case of a directory rename. + * + * In case the directory is being renamed to some other name, the destination + * check would happen on the AzureBlobFileSystem#rename method. + */ + if (pathInformation.getIsDirectory() && dst.getName() + .equals(src.getName())) { + PathInformation dstPathInformation = getPathInformation( + dst, + tracingContext); + if (dstPathInformation.getPathExists()) { + LOG.info( + "Rename src: {} dst: {} failed as qualifiedDst already exists", + src, dst); + throw new AbfsRestOperationException( + HttpURLConnection.HTTP_CONFLICT, + AzureServiceErrorCode.PATH_ALREADY_EXISTS.getErrorCode(), null, + null); + } + } + } + + /** Validate if the parent of the destination path exists. + * + * @param src source path + * @param dst destination path + * @param pathInformation object containing the path information of the source path + * + * @throws AbfsRestOperationException if the parent of the destination path does not exist + */ + private void validateDestinationParentExist(final Path src, + final Path dst, + final PathInformation pathInformation) + throws AzureBlobFileSystemException { + final Path nestedDstParent = dst.getParent(); + if (!dst.isRoot() && nestedDstParent != null && !nestedDstParent.isRoot() + && ( + !pathInformation.getIsDirectory() || !dst.getName() + .equals(src.getName()))) { + PathInformation nestedDstInfo = getPathInformation( + nestedDstParent, + tracingContext); + if (!nestedDstInfo.getPathExists() || !nestedDstInfo.getIsDirectory()) { + throw new AbfsRestOperationException( + HttpURLConnection.HTTP_NOT_FOUND, + RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode(), null, + new Exception( + RENAME_DESTINATION_PARENT_PATH_NOT_FOUND.getErrorCode())); + } + } + } + + /** {@inheritDoc} */ + @Override + boolean takeAction(final Path path) throws AzureBlobFileSystemException { + return renameInternal(path, + createDestinationPathForBlobPartOfRenameSrcDir(dst, path, src)); Review Comment: Method name can be shortened -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
