anmolanmol1234 commented on code in PR #7265: URL: https://github.com/apache/hadoop/pull/7265#discussion_r1916525068
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java: ########## @@ -0,0 +1,343 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +//import java.util.ArrayList; +//import java.util.Collections; +//import java.util.List; +import java.util.Random; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_PRECON_FAILED; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH; +//import static org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler.generateBlockListXml; + +/** + * For a directory enabled for atomic-rename, before rename starts, a file with + * -RenamePending.json suffix is created. In this file, the states required for the + * rename operation are given. This file is created by {@link #preRename()} method. + * This is important in case the JVM process crashes during rename, the atomicity + * will be maintained, when the job calls {@link AzureBlobFileSystem#listStatus(Path)} + * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to filesystem, + * it will be checked if there is any RenamePending JSON file. If yes, the crashed rename + * operation would be resumed as per the file. + */ +public class RenameAtomicity { + + private final TracingContext tracingContext; + + private Path src, dst; + + private String srcEtag; + + private final AbfsBlobClient abfsClient; + + private final Path renameJsonPath; + + public static final String SUFFIX = "-RenamePending.json"; + + private int preRenameRetryCount = 0; + + private int renamePendingJsonLen; + + private final AbfsLease sourcePathLease; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + private static final Random RANDOM = new Random(); + + /** + * Performs pre-rename operations. Creates a file with -RenamePending.json + * suffix in the source parent directory. This file contains the states + * required for the rename operation. + * + * @param src Source path + * @param dst Destination path + * @param renameJsonPath Path of the JSON file to be created + * @param tracingContext Tracing context + * @param srcEtag ETag of the source directory + * @param abfsClient AbfsClient instance + */ + public RenameAtomicity(final Path src, final Path dst, + final Path renameJsonPath, + TracingContext tracingContext, + final String srcEtag, + final AbfsClient abfsClient) { + this.src = src; + this.dst = dst; + this.abfsClient = (AbfsBlobClient) abfsClient; + this.renameJsonPath = renameJsonPath; + this.tracingContext = tracingContext; + this.srcEtag = srcEtag; + this.sourcePathLease = null; + } + + /** + * Resumes the rename operation from the JSON file. + * + * @param renameJsonPath Path of the JSON file + * @param renamePendingJsonFileLen Length of the JSON file + * @param tracingContext Tracing context + * @param srcEtag ETag of the source directory + * @param abfsClient AbfsClient instance + * @param sourceLease Lease of the source directory + */ + public RenameAtomicity(final Path renameJsonPath, + final int renamePendingJsonFileLen, + TracingContext tracingContext, + final String srcEtag, + final AbfsClient abfsClient, + final AbfsLease sourceLease) { + this.abfsClient = (AbfsBlobClient) abfsClient; + this.renameJsonPath = renameJsonPath; + this.tracingContext = tracingContext; + this.srcEtag = srcEtag; + this.renamePendingJsonLen = renamePendingJsonFileLen; + this.sourcePathLease = sourceLease; + } + + /** + * Redo the rename operation from the JSON file. + * + * @throws AzureBlobFileSystemException If the redo operation fails. + */ + public void redo() throws AzureBlobFileSystemException { + byte[] buffer = readRenamePendingJson(renameJsonPath, renamePendingJsonLen); + String contents = new String(buffer, Charset.defaultCharset()); + try { + final RenamePendingJsonFormat renamePendingJsonFormatObj; + try { + renamePendingJsonFormatObj = objectMapper.readValue(contents, + RenamePendingJsonFormat.class); + } catch (JsonProcessingException e) { + return; + } + if (renamePendingJsonFormatObj != null && StringUtils.isNotEmpty( + renamePendingJsonFormatObj.getOldFolderName()) + && StringUtils.isNotEmpty( + renamePendingJsonFormatObj.getNewFolderName()) + && StringUtils.isNotEmpty(renamePendingJsonFormatObj.getETag())) { + this.src = new Path(renamePendingJsonFormatObj.getOldFolderName()); + this.dst = new Path(renamePendingJsonFormatObj.getNewFolderName()); + this.srcEtag = renamePendingJsonFormatObj.getETag(); + + BlobRenameHandler blobRenameHandler = new BlobRenameHandler( + this.src.toUri().getPath(), dst.toUri().getPath(), + abfsClient, srcEtag, true, true, + sourcePathLease, tracingContext); + + blobRenameHandler.execute(); + } + } finally { + deleteRenamePendingJson(); + } + } + + /** Read the JSON file. + * + * @param path Path of the JSON file + * @param len Length of the JSON file + * @return Contents of the JSON file + * @throws AzureBlobFileSystemException If the read operation fails. + */ + @VisibleForTesting + byte[] readRenamePendingJson(Path path, int len) + throws AzureBlobFileSystemException { + byte[] bytes = new byte[len]; + abfsClient.read(path.toUri().getPath(), 0, bytes, 0, + len, null, null, null, + tracingContext); + return bytes; + } + + /** Generate a random block ID. + * + * @return Random block ID + */ + public static String generateBlockId() { + // PutBlock on the path. + byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH]; + RANDOM.nextBytes(blockIdByteArray); + return new String(Base64.encodeBase64(blockIdByteArray), + StandardCharsets.UTF_8); + } + + /** Create the JSON file with the contents. + * + * @param path Path of the JSON file + * @param bytes Contents of the JSON file + * @throws AzureBlobFileSystemException If the create operation fails. + */ + @VisibleForTesting + void createRenamePendingJson(Path path, byte[] bytes) + throws AzureBlobFileSystemException { + // PutBlob on the path. + AbfsRestOperation putBlobOp = abfsClient.createPath(path.toUri().getPath(), + true, + true, + null, + false, + null, + null, + tracingContext); + String eTag = extractEtagHeader(putBlobOp.getResult()); + + String blockId = generateBlockId(); + AppendRequestParameters appendRequestParameters + = new AppendRequestParameters(0, 0, + bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false, null, + abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(), + new BlobAppendRequestParameters(blockId, eTag)); + + abfsClient.append(path.toUri().getPath(), bytes, + appendRequestParameters, null, null, tracingContext); + +// List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId)); +// String blockList = generateBlockListXml(blockIdList); + // PutBlockList on the path. + String blockList = ""; Review Comment: if flush is called on empty string, how does it take the blockId into usage ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
