[
https://issues.apache.org/jira/browse/HADOOP-19233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17913554#comment-17913554
]
ASF GitHub Bot commented on HADOOP-19233:
-----------------------------------------
bhattmanish98 commented on code in PR #7265:
URL: https://github.com/apache/hadoop/pull/7265#discussion_r1917776552
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java:
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+//import java.util.ArrayList;
+//import java.util.Collections;
+//import java.util.List;
+import java.util.Random;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH;
+//import static
org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler.generateBlockListXml;
+
+/**
+ * For a directory enabled for atomic-rename, before rename starts, a file with
+ * -RenamePending.json suffix is created. In this file, the states required
for the
+ * rename operation are given. This file is created by {@link #preRename()}
method.
+ * This is important in case the JVM process crashes during rename, the
atomicity
+ * will be maintained, when the job calls {@link
AzureBlobFileSystem#listStatus(Path)}
+ * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to
filesystem,
+ * it will be checked if there is any RenamePending JSON file. If yes, the
crashed rename
+ * operation would be resumed as per the file.
+ */
+public class RenameAtomicity {
+
+ private final TracingContext tracingContext;
+
+ private Path src, dst;
+
+ private String srcEtag;
+
+ private final AbfsBlobClient abfsClient;
+
+ private final Path renameJsonPath;
+
+ public static final String SUFFIX = "-RenamePending.json";
+
+ private int preRenameRetryCount = 0;
+
+ private int renamePendingJsonLen;
+
+ private final AbfsLease sourcePathLease;
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ private static final Random RANDOM = new Random();
+
+ /**
+ * Performs pre-rename operations. Creates a file with -RenamePending.json
+ * suffix in the source parent directory. This file contains the states
+ * required for the rename operation.
+ *
+ * @param src Source path
+ * @param dst Destination path
+ * @param renameJsonPath Path of the JSON file to be created
+ * @param tracingContext Tracing context
+ * @param srcEtag ETag of the source directory
+ * @param abfsClient AbfsClient instance
+ */
+ public RenameAtomicity(final Path src, final Path dst,
+ final Path renameJsonPath,
+ TracingContext tracingContext,
+ final String srcEtag,
+ final AbfsClient abfsClient) {
+ this.src = src;
+ this.dst = dst;
+ this.abfsClient = (AbfsBlobClient) abfsClient;
+ this.renameJsonPath = renameJsonPath;
+ this.tracingContext = tracingContext;
+ this.srcEtag = srcEtag;
+ this.sourcePathLease = null;
+ }
+
+ /**
+ * Resumes the rename operation from the JSON file.
+ *
+ * @param renameJsonPath Path of the JSON file
+ * @param renamePendingJsonFileLen Length of the JSON file
+ * @param tracingContext Tracing context
+ * @param srcEtag ETag of the source directory
+ * @param abfsClient AbfsClient instance
+ * @param sourceLease Lease of the source directory
+ */
+ public RenameAtomicity(final Path renameJsonPath,
+ final int renamePendingJsonFileLen,
+ TracingContext tracingContext,
+ final String srcEtag,
+ final AbfsClient abfsClient,
+ final AbfsLease sourceLease) {
+ this.abfsClient = (AbfsBlobClient) abfsClient;
+ this.renameJsonPath = renameJsonPath;
+ this.tracingContext = tracingContext;
+ this.srcEtag = srcEtag;
+ this.renamePendingJsonLen = renamePendingJsonFileLen;
+ this.sourcePathLease = sourceLease;
+ }
+
+ /**
+ * Redo the rename operation from the JSON file.
+ *
+ * @throws AzureBlobFileSystemException If the redo operation fails.
+ */
+ public void redo() throws AzureBlobFileSystemException {
+ byte[] buffer = readRenamePendingJson(renameJsonPath,
renamePendingJsonLen);
+ String contents = new String(buffer, Charset.defaultCharset());
+ try {
+ final RenamePendingJsonFormat renamePendingJsonFormatObj;
+ try {
+ renamePendingJsonFormatObj = objectMapper.readValue(contents,
+ RenamePendingJsonFormat.class);
+ } catch (JsonProcessingException e) {
+ return;
+ }
+ if (renamePendingJsonFormatObj != null && StringUtils.isNotEmpty(
+ renamePendingJsonFormatObj.getOldFolderName())
+ && StringUtils.isNotEmpty(
+ renamePendingJsonFormatObj.getNewFolderName())
+ &&
StringUtils.isNotEmpty(renamePendingJsonFormatObj.getETag())) {
+ this.src = new
Path(renamePendingJsonFormatObj.getOldFolderName());
+ this.dst = new
Path(renamePendingJsonFormatObj.getNewFolderName());
+ this.srcEtag = renamePendingJsonFormatObj.getETag();
+
+ BlobRenameHandler blobRenameHandler = new BlobRenameHandler(
+ this.src.toUri().getPath(), dst.toUri().getPath(),
+ abfsClient, srcEtag, true, true,
+ sourcePathLease, tracingContext);
+
+ blobRenameHandler.execute();
+ }
+ } finally {
+ deleteRenamePendingJson();
+ }
+ }
+
+ /** Read the JSON file.
+ *
+ * @param path Path of the JSON file
+ * @param len Length of the JSON file
+ * @return Contents of the JSON file
+ * @throws AzureBlobFileSystemException If the read operation fails.
+ */
+ @VisibleForTesting
+ byte[] readRenamePendingJson(Path path, int len)
+ throws AzureBlobFileSystemException {
+ byte[] bytes = new byte[len];
+ abfsClient.read(path.toUri().getPath(), 0, bytes, 0,
+ len, null, null, null,
+ tracingContext);
+ return bytes;
+ }
+
+ /** Generate a random block ID.
+ *
+ * @return Random block ID
+ */
+ public static String generateBlockId() {
+ // PutBlock on the path.
+ byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH];
+ RANDOM.nextBytes(blockIdByteArray);
+ return new String(Base64.encodeBase64(blockIdByteArray),
+ StandardCharsets.UTF_8);
+ }
+
+ /** Create the JSON file with the contents.
+ *
+ * @param path Path of the JSON file
+ * @param bytes Contents of the JSON file
+ * @throws AzureBlobFileSystemException If the create operation fails.
+ */
+ @VisibleForTesting
+ void createRenamePendingJson(Path path, byte[] bytes)
+ throws AzureBlobFileSystemException {
+ // PutBlob on the path.
+ AbfsRestOperation putBlobOp =
abfsClient.createPath(path.toUri().getPath(),
+ true,
+ true,
+ null,
+ false,
+ null,
+ null,
+ tracingContext);
+ String eTag = extractEtagHeader(putBlobOp.getResult());
+
+ String blockId = generateBlockId();
+ AppendRequestParameters appendRequestParameters
+ = new AppendRequestParameters(0, 0,
+ bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false,
null,
+ abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(),
+ new BlobAppendRequestParameters(blockId, eTag));
+
+ abfsClient.append(path.toUri().getPath(), bytes,
+ appendRequestParameters, null, null, tracingContext);
+
+// List<String> blockIdList = new
ArrayList<>(Collections.singleton(blockId));
+// String blockList = generateBlockListXml(blockIdList);
+ // PutBlockList on the path.
+ String blockList = "";
Review Comment:
Added this line just to pass the build. The above commented line is calling
generateBlockListXml which requires ingress handler changes. Will pick the
changes of ingress handler once it is merged and this line is no longer needed
after that.
> ABFS: [FnsOverBlob] Implementing Rename and Delete APIs over Blob Endpoint
> --------------------------------------------------------------------------
>
> Key: HADOOP-19233
> URL: https://issues.apache.org/jira/browse/HADOOP-19233
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/azure
> Affects Versions: 3.4.0
> Reporter: Anuj Modi
> Assignee: Manish Bhatt
> Priority: Major
> Labels: pull-request-available
>
> Currently, we only support rename and delete operations on the DFS endpoint.
> The reason for supporting rename and delete operations on the Blob endpoint
> is that the Blob endpoint does not account for hierarchy. We need to ensure
> that the HDFS contracts are maintained when performing rename and delete
> operations. Renaming or deleting a directory over the Blob endpoint requires
> the client to handle the orchestration and rename or delete all the blobs
> within the specified directory.
>
> The task outlines the considerations for implementing rename and delete
> operations for the FNS-blob endpoint to ensure compatibility with HDFS
> contracts.
> * {*}Blob Endpoint Usage{*}: The task addresses the need for abstraction in
> the code to maintain HDFS contracts while performing rename and delete
> operations on the blob endpoint, which does not support hierarchy.
> * {*}Rename Operations{*}: The {{AzureBlobFileSystem#rename()}} method will
> use a {{RenameHandler}} instance to handle rename operations, with separate
> handlers for the DFS and blob endpoints. This method includes prechecks,
> destination adjustments, and orchestration of directory renaming for blobs.
> * {*}Atomic Rename{*}: Atomic renaming is essential for blob endpoints, as
> it requires orchestration to copy or delete each blob within the directory. A
> configuration will allow developers to specify directories for atomic
> renaming, with a JSON file to track the status of renames.
> * {*}Delete Operations{*}: Delete operations are simpler than renames,
> requiring fewer HDFS contract checks. For blob endpoints, the client must
> handle orchestration, including managing orphaned directories created by
> Az-copy.
> * {*}Orchestration for Rename/Delete{*}: Orchestration for rename and delete
> operations over blob endpoints involves listing blobs and performing actions
> on each blob. The process must be optimized to handle large numbers of blobs
> efficiently.
> * {*}Need for Optimization{*}: Optimization is crucial because the
> {{ListBlob}} API can return a maximum of 5000 blobs at once, necessitating
> multiple calls for large directories. The task proposes a producer-consumer
> model to handle blobs in parallel, thereby reducing processing time and
> memory usage.
> * {*}Producer-Consumer Design{*}: The proposed design includes a producer to
> list blobs, a queue to store the blobs, and a consumer to process them in
> parallel. This approach aims to improve efficiency and mitigate memory issues.
> More details will follow
> Perquisites for this Patch:
> 1. HADOOP-19187 ABFS: [FnsOverBlob]Making AbfsClient Abstract for supporting
> both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. HADOOP-19226 ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. HADOOP-19207 ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs
> and Metadata APIs - ASF JIRA (apache.org)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]