anmolanmol1234 commented on code in PR #7265: URL: https://github.com/apache/hadoop/pull/7265#discussion_r1916365565
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/BlobRenameHandler.java: ########## @@ -0,0 +1,652 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.enums.BlobCopyProgress; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_ABORTED; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_FAILED; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COPY_STATUS_SUCCESS; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_ID; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_SOURCE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_COPY_STATUS_DESCRIPTION; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_ABORTED; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.COPY_BLOB_FAILED; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND; + +/** + * Orchestrator for rename over Blob endpoint. Handles both directory and file + * renames. Blob Endpoint does not expose rename API, this class is responsible + * for copying the blobs and deleting the source blobs. + * <p> + * For directory rename, it recursively lists the blobs in the source directory and + * copies them to the destination directory. + */ +public class BlobRenameHandler extends ListActionTaker { + + public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); + + private final String srcEtag; + + private final Path src, dst; + + private final boolean isAtomicRename, isAtomicRenameRecovery; + + private final TracingContext tracingContext; + + private AbfsLease srcAbfsLease; + + private String srcLeaseId; + + private final List<AbfsLease> leases = new ArrayList<>(); + + private final AtomicInteger operatedBlobCount = new AtomicInteger(0); + + /** Constructor. + * + * @param src source path + * @param dst destination path + * @param abfsClient AbfsBlobClient to use for the rename operation + * @param srcEtag eTag of the source path + * @param isAtomicRename true if the rename operation is atomic + * @param isAtomicRenameRecovery true if the rename operation is a recovery of a previous failed atomic rename operation + * @param tracingContext object of tracingContext used for the tracing of the server calls. + */ + public BlobRenameHandler(final String src, + final String dst, + final AbfsBlobClient abfsClient, + final String srcEtag, + final boolean isAtomicRename, + final boolean isAtomicRenameRecovery, + final TracingContext tracingContext) { + super(new Path(src), abfsClient, tracingContext); + this.srcEtag = srcEtag; + this.tracingContext = tracingContext; + this.src = new Path(src); + this.dst = new Path(dst); + this.isAtomicRename = isAtomicRename; + this.isAtomicRenameRecovery = isAtomicRenameRecovery; + } + + public BlobRenameHandler(final String src, + final String dst, + final AbfsBlobClient abfsClient, + final String srcEtag, + final boolean isAtomicRename, + final boolean isAtomicRenameRecovery, + final AbfsLease srcAbfsLease, + final TracingContext tracingContext) { + this(src, dst, abfsClient, srcEtag, isAtomicRename, isAtomicRenameRecovery, + tracingContext); + this.srcAbfsLease = srcAbfsLease; + } + + @Override + int getMaxConsumptionParallelism() { + return getAbfsClient().getAbfsConfiguration() + .getBlobRenameDirConsumptionParallelism(); + } + + /** + * Orchestrates the rename operation. + * + * @return AbfsClientRenameResult containing the result of the rename operation + * @throws AzureBlobFileSystemException if server call fails + */ + public boolean execute() throws AzureBlobFileSystemException { + PathInformation pathInformation = new PathInformation(); + boolean result = false; + if (preCheck(src, dst, pathInformation)) { + RenameAtomicity renameAtomicity = null; + if (pathInformation.getIsDirectory() + && pathInformation.getIsImplicit()) { + AbfsRestOperation createMarkerOp = getAbfsClient().createPath(src.toUri().getPath(), + false, false, null, + false, null, null, tracingContext); + pathInformation.setETag(extractEtagHeader(createMarkerOp.getResult())); + } + try { + if (isAtomicRename) { + /* + * Conditionally get a lease on the source blob to prevent other writers + * from changing it. This is used for correctness in HBase when log files + * are renamed. When the HBase master renames a log file folder, the lease + * locks out other writers. This prevents a region server that the master + * thinks is dead, but is still alive, from committing additional updates. + * This is different than when HBase runs on HDFS, where the region server + * recovers the lease on a log file, to gain exclusive access to it, before + * it splits it. + */ + if (srcAbfsLease == null) { + srcAbfsLease = takeLease(src, srcEtag); + } + srcLeaseId = srcAbfsLease.getLeaseID(); + if (!isAtomicRenameRecovery && pathInformation.getIsDirectory()) { + /* + * if it is not a resume of a previous failed atomic rename operation, + * perform the pre-rename operation. + */ + renameAtomicity = getRenameAtomicity(pathInformation); + renameAtomicity.preRename(); + } + } + if (pathInformation.getIsDirectory()) { + result = listRecursiveAndTakeAction() && finalSrcRename(); + } else { + result = renameInternal(src, dst); + } + } finally { + if (srcAbfsLease != null) { + // If the operation is successful, cancel the timer and no need to release + // the lease as delete on the blob-path has taken place. + if (result) { + srcAbfsLease.cancelTimer(); + } else { + srcAbfsLease.free(); + } + } + } + if (result && renameAtomicity != null) { + renameAtomicity.postRename(); + } + } + return result; + } + + /** Final rename operation after all the blobs have been copied. + * + * @return true if rename is successful + * @throws AzureBlobFileSystemException if server call fails + */ + private boolean finalSrcRename() throws AzureBlobFileSystemException { + tracingContext.setOperatedBlobCount(operatedBlobCount.get() + 1); + try { + return renameInternal(src, dst); + } finally { + tracingContext.setOperatedBlobCount(null); + } + } + + /** Gets the rename atomicity object. + * + * @param pathInformation object containing the path information of the source path + * + * @return RenameAtomicity object + */ + @VisibleForTesting + public RenameAtomicity getRenameAtomicity(final PathInformation pathInformation) { + return new RenameAtomicity(src, + dst, + new Path(src.getParent(), src.getName() + RenameAtomicity.SUFFIX), + tracingContext, + pathInformation.getETag(), + getAbfsClient()); + } + + /** Takes a lease on the path. + * + * @param path path on which the lease is to be taken + * @param eTag eTag of the path + * + * @return object containing the lease information + * @throws AzureBlobFileSystemException if server call fails + */ + private AbfsLease takeLease(final Path path, final String eTag) + throws AzureBlobFileSystemException { + AbfsLease lease = new AbfsLease(getAbfsClient(), path.toUri().getPath(), false, + getAbfsClient().getAbfsConfiguration() + .getAtomicRenameLeaseRefreshDuration(), + eTag, tracingContext); + leases.add(lease); + return lease; + } + + /** Checks if the path contains a colon. + * + * @param p path to check + * + * @return true if the path contains a colon + */ + private boolean containsColon(Path p) { + return p.toUri().getPath().contains(":"); Review Comment: COLON constant can be used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
