[
https://issues.apache.org/jira/browse/HADOOP-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913286#comment-17913286
]
ASF GitHub Bot commented on HADOOP-19233:
-----------------------------------------
anmolanmol1234 commented on code in PR #7265:
URL: https://github.com/apache/hadoop/pull/7265#discussion_r1916521221
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java:
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+//import java.util.ArrayList;
+//import java.util.Collections;
+//import java.util.List;
+import java.util.Random;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH;
+//import static
org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler.generateBlockListXml;
+
+/**
+ * For a directory enabled for atomic-rename, before rename starts, a file with
+ * -RenamePending.json suffix is created. In this file, the states required
for the
+ * rename operation are given. This file is created by {@link #preRename()}
method.
+ * This is important in case the JVM process crashes during rename, the
atomicity
+ * will be maintained, when the job calls {@link
AzureBlobFileSystem#listStatus(Path)}
+ * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to
filesystem,
+ * it will be checked if there is any RenamePending JSON file. If yes, the
crashed rename
+ * operation would be resumed as per the file.
+ */
+public class RenameAtomicity {
+
+ private final TracingContext tracingContext;
+
+ private Path src, dst;
+
+ private String srcEtag;
+
+ private final AbfsBlobClient abfsClient;
+
+ private final Path renameJsonPath;
+
+ public static final String SUFFIX = "-RenamePending.json";
+
+ private int preRenameRetryCount = 0;
+
+ private int renamePendingJsonLen;
+
+ private final AbfsLease sourcePathLease;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private static final Random RANDOM = new Random();
+
+ /**
+ * Performs pre-rename operations. Creates a file with -RenamePending.json
+ * suffix in the source parent directory. This file contains the states
+ * required for the rename operation.
+ *
+ * @param src Source path
+ * @param dst Destination path
+ * @param renameJsonPath Path of the JSON file to be created
+ * @param tracingContext Tracing context
+ * @param srcEtag ETag of the source directory
+ * @param abfsClient AbfsClient instance
+ */
+ public RenameAtomicity(final Path src, final Path dst,
+ final Path renameJsonPath,
+ TracingContext tracingContext,
+ final String srcEtag,
+ final AbfsClient abfsClient) {
+ this.src = src;
+ this.dst = dst;
+ this.abfsClient = (AbfsBlobClient) abfsClient;
+ this.renameJsonPath = renameJsonPath;
+ this.tracingContext = tracingContext;
+ this.srcEtag = srcEtag;
+ this.sourcePathLease = null;
+ }
+
+ /**
+ * Resumes the rename operation from the JSON file.
+ *
+ * @param renameJsonPath Path of the JSON file
+ * @param renamePendingJsonFileLen Length of the JSON file
+ * @param tracingContext Tracing context
+ * @param srcEtag ETag of the source directory
+ * @param abfsClient AbfsClient instance
+ * @param sourceLease Lease of the source directory
+ */
+ public RenameAtomicity(final Path renameJsonPath,
+ final int renamePendingJsonFileLen,
+ TracingContext tracingContext,
+ final String srcEtag,
+ final AbfsClient abfsClient,
+ final AbfsLease sourceLease) {
+ this.abfsClient = (AbfsBlobClient) abfsClient;
+ this.renameJsonPath = renameJsonPath;
+ this.tracingContext = tracingContext;
+ this.srcEtag = srcEtag;
+ this.renamePendingJsonLen = renamePendingJsonFileLen;
+ this.sourcePathLease = sourceLease;
+ }
+
+ /**
+ * Redo the rename operation from the JSON file.
+ *
+ * @throws AzureBlobFileSystemException If the redo operation fails.
+ */
+ public void redo() throws AzureBlobFileSystemException {
+ byte[] buffer = readRenamePendingJson(renameJsonPath,
renamePendingJsonLen);
+ String contents = new String(buffer, Charset.defaultCharset());
+ try {
+ final RenamePendingJsonFormat renamePendingJsonFormatObj;
+ try {
+ renamePendingJsonFormatObj = objectMapper.readValue(contents,
+ RenamePendingJsonFormat.class);
+ } catch (JsonProcessingException e) {
+ return;
+ }
+ if (renamePendingJsonFormatObj != null && StringUtils.isNotEmpty(
+ renamePendingJsonFormatObj.getOldFolderName())
+ && StringUtils.isNotEmpty(
+ renamePendingJsonFormatObj.getNewFolderName())
+ &&
StringUtils.isNotEmpty(renamePendingJsonFormatObj.getETag())) {
+ this.src = new
Path(renamePendingJsonFormatObj.getOldFolderName());
+ this.dst = new
Path(renamePendingJsonFormatObj.getNewFolderName());
+ this.srcEtag = renamePendingJsonFormatObj.getETag();
+
+ BlobRenameHandler blobRenameHandler = new BlobRenameHandler(
+ this.src.toUri().getPath(), dst.toUri().getPath(),
+ abfsClient, srcEtag, true, true,
+ sourcePathLease, tracingContext);
+
+ blobRenameHandler.execute();
+ }
+ } finally {
+ deleteRenamePendingJson();
+ }
+ }
+
+ /** Read the JSON file.
+ *
+ * @param path Path of the JSON file
+ * @param len Length of the JSON file
+ * @return Contents of the JSON file
+ * @throws AzureBlobFileSystemException If the read operation fails.
+ */
+ @VisibleForTesting
+ byte[] readRenamePendingJson(Path path, int len)
+ throws AzureBlobFileSystemException {
+ byte[] bytes = new byte[len];
+ abfsClient.read(path.toUri().getPath(), 0, bytes, 0,
+ len, null, null, null,
+ tracingContext);
+ return bytes;
+ }
+
+ /** Generate a random block ID.
+ *
+ * @return Random block ID
+ */
+ public static String generateBlockId() {
+ // PutBlock on the path.
+ byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH];
+ RANDOM.nextBytes(blockIdByteArray);
+ return new String(Base64.encodeBase64(blockIdByteArray),
+ StandardCharsets.UTF_8);
+ }
+
+ /** Create the JSON file with the contents.
+ *
+ * @param path Path of the JSON file
+ * @param bytes Contents of the JSON file
+ * @throws AzureBlobFileSystemException If the create operation fails.
+ */
+ @VisibleForTesting
+ void createRenamePendingJson(Path path, byte[] bytes)
+ throws AzureBlobFileSystemException {
+ // PutBlob on the path.
+ AbfsRestOperation putBlobOp =
abfsClient.createPath(path.toUri().getPath(),
+ true,
+ true,
+ null,
+ false,
+ null,
+ null,
+ tracingContext);
+ String eTag = extractEtagHeader(putBlobOp.getResult());
+
+ String blockId = generateBlockId();
+ AppendRequestParameters appendRequestParameters
+ = new AppendRequestParameters(0, 0,
+ bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false,
null,
+ abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(),
+ new BlobAppendRequestParameters(blockId, eTag));
+
+ abfsClient.append(path.toUri().getPath(), bytes,
+ appendRequestParameters, null, null, tracingContext);
+
+// List<String> blockIdList = new
ArrayList<>(Collections.singleton(blockId));
Review Comment:
Remove commented code
> ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint
> --------------------------------------------------------------------------
>
> Key: HADOOP-19233
> URL: https://issues.apache.org/jira/browse/HADOOP-19233
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Anuj Modi
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
> Currently, we only support rename and delete operations on the DFS endpoint.
> The reason for supporting rename and delete operations on the Blob endpoint
> is that the Blob endpoint does not account for hierarchy. We need to ensure
> that the HDFS contracts are maintained when performing rename and delete
> operations. Renaming or deleting a directory over the Blob endpoint requires
> the client to handle the orchestration and rename or delete all the blobs
> within the specified directory.
>
> The task outlines the considerations for implementing rename and delete
> operations for the FNS-blob endpoint to ensure compatibility with HDFS
> contracts.
> * {*}Blob Endpoint Usage{*}: The task addresses the need for abstraction in
> the code to maintain HDFS contracts while performing rename and delete
> operations on the blob endpoint, which does not support hierarchy.
> * {*}Rename Operations{*}: The {{AzureBlobFileSystem#rename()}} method will
> use a {{RenameHandler}} instance to handle rename operations, with separate
> handlers for the DFS and blob endpoints. This method includes prechecks,
> destination adjustments, and orchestration of directory renaming for blobs.
> * {*}Atomic Rename{*}: Atomic renaming is essential for blob endpoints, as
> it requires orchestration to copy or delete each blob within the directory. A
> configuration will allow developers to specify directories for atomic
> renaming, with a JSON file to track the status of renames.
> * {*}Delete Operations{*}: Delete operations are simpler than renames,
> requiring fewer HDFS contract checks. For blob endpoints, the client must
> handle orchestration, including managing orphaned directories created by
> Az-copy.
> * {*}Orchestration for Rename/Delete{*}: Orchestration for rename and delete
> operations over blob endpoints involves listing blobs and performing actions
> on each blob. The process must be optimized to handle large numbers of blobs
> efficiently.
> * {*}Need for Optimization{*}: Optimization is crucial because the
> {{ListBlob}} API can return a maximum of 5000 blobs at once, necessitating
> multiple calls for large directories. The task proposes a producer-consumer
> model to handle blobs in parallel, thereby reducing processing time and
> memory usage.
> * {*}Producer-Consumer Design{*}: The proposed design includes a producer to
> list blobs, a queue to store the blobs, and a consumer to process them in
> parallel. This approach aims to improve efficiency and mitigate memory issues.
> More details will follow
> Perquisites for this Patch:
> 1. HADOOP-19187 ABFS: [FnsOverBlob]Making AbfsClient Abstract for supporting
> both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. HADOOP-19226 ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. HADOOP-19207 ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs
> and Metadata APIs - ASF JIRA (apache.org)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]