bhattmanish98 commented on code in PR #7265:
URL: https://github.com/apache/hadoop/pull/7265#discussion_r1917776552


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/RenameAtomicity.java:
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+//import java.util.ArrayList;
+//import java.util.Collections;
+//import java.util.List;
+import java.util.Random;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
+import static 
org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_ID_LENGTH;
+//import static 
org.apache.hadoop.fs.azurebfs.services.AzureIngressHandler.generateBlockListXml;
+
+/**
+ * For a directory enabled for atomic-rename, before rename starts, a file with
+ * -RenamePending.json suffix is created. In this file, the states required 
for the
+ * rename operation are given. This file is created by {@link #preRename()} 
method.
+ * This is important in case the JVM process crashes during rename, the 
atomicity
+ * will be maintained, when the job calls {@link 
AzureBlobFileSystem#listStatus(Path)}
+ * or {@link AzureBlobFileSystem#getFileStatus(Path)}. On these API calls to 
filesystem,
+ * it will be checked if there is any RenamePending JSON file. If yes, the 
crashed rename
+ * operation would be resumed as per the file.
+ */
+public class RenameAtomicity {
+
+    private final TracingContext tracingContext;
+
+    private Path src, dst;
+
+    private String srcEtag;
+
+    private final AbfsBlobClient abfsClient;
+
+    private final Path renameJsonPath;
+
+    public static final String SUFFIX = "-RenamePending.json";
+
+    private int preRenameRetryCount = 0;
+
+    private int renamePendingJsonLen;
+
+    private final AbfsLease sourcePathLease;
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private static final Random RANDOM = new Random();
+
+    /**
+     * Performs pre-rename operations. Creates a file with -RenamePending.json
+     * suffix in the source parent directory. This file contains the states
+     * required for the rename operation.
+     *
+     * @param src Source path
+     * @param dst Destination path
+     * @param renameJsonPath Path of the JSON file to be created
+     * @param tracingContext Tracing context
+     * @param srcEtag ETag of the source directory
+     * @param abfsClient AbfsClient instance
+     */
+    public RenameAtomicity(final Path src, final Path dst,
+                           final Path renameJsonPath,
+                           TracingContext tracingContext,
+                           final String srcEtag,
+                           final AbfsClient abfsClient) {
+        this.src = src;
+        this.dst = dst;
+        this.abfsClient = (AbfsBlobClient) abfsClient;
+        this.renameJsonPath = renameJsonPath;
+        this.tracingContext = tracingContext;
+        this.srcEtag = srcEtag;
+        this.sourcePathLease = null;
+    }
+
+    /**
+     * Resumes the rename operation from the JSON file.
+     *
+     * @param renameJsonPath Path of the JSON file
+     * @param renamePendingJsonFileLen Length of the JSON file
+     * @param tracingContext Tracing context
+     * @param srcEtag ETag of the source directory
+     * @param abfsClient AbfsClient instance
+     * @param sourceLease Lease of the source directory
+     */
+    public RenameAtomicity(final Path renameJsonPath,
+                           final int renamePendingJsonFileLen,
+                           TracingContext tracingContext,
+                           final String srcEtag,
+                           final AbfsClient abfsClient,
+                           final AbfsLease sourceLease) {
+        this.abfsClient = (AbfsBlobClient) abfsClient;
+        this.renameJsonPath = renameJsonPath;
+        this.tracingContext = tracingContext;
+        this.srcEtag = srcEtag;
+        this.renamePendingJsonLen = renamePendingJsonFileLen;
+        this.sourcePathLease = sourceLease;
+    }
+
+    /**
+     * Redo the rename operation from the JSON file.
+     *
+     * @throws AzureBlobFileSystemException If the redo operation fails.
+     */
+    public void redo() throws AzureBlobFileSystemException {
+        byte[] buffer = readRenamePendingJson(renameJsonPath, 
renamePendingJsonLen);
+        String contents = new String(buffer, Charset.defaultCharset());
+        try {
+            final RenamePendingJsonFormat renamePendingJsonFormatObj;
+            try {
+                renamePendingJsonFormatObj = objectMapper.readValue(contents,
+                        RenamePendingJsonFormat.class);
+            } catch (JsonProcessingException e) {
+                return;
+            }
+            if (renamePendingJsonFormatObj != null && StringUtils.isNotEmpty(
+                    renamePendingJsonFormatObj.getOldFolderName())
+                    && StringUtils.isNotEmpty(
+                    renamePendingJsonFormatObj.getNewFolderName())
+                    && 
StringUtils.isNotEmpty(renamePendingJsonFormatObj.getETag())) {
+                this.src = new 
Path(renamePendingJsonFormatObj.getOldFolderName());
+                this.dst = new 
Path(renamePendingJsonFormatObj.getNewFolderName());
+                this.srcEtag = renamePendingJsonFormatObj.getETag();
+
+                BlobRenameHandler blobRenameHandler = new BlobRenameHandler(
+                        this.src.toUri().getPath(), dst.toUri().getPath(),
+                        abfsClient, srcEtag, true, true,
+                        sourcePathLease, tracingContext);
+
+                blobRenameHandler.execute();
+            }
+        } finally {
+            deleteRenamePendingJson();
+        }
+    }
+
+    /** Read the JSON file.
+     *
+     * @param path Path of the JSON file
+     * @param len Length of the JSON file
+     * @return Contents of the JSON file
+     * @throws AzureBlobFileSystemException If the read operation fails.
+     */
+    @VisibleForTesting
+    byte[] readRenamePendingJson(Path path, int len)
+            throws AzureBlobFileSystemException {
+        byte[] bytes = new byte[len];
+        abfsClient.read(path.toUri().getPath(), 0, bytes, 0,
+                len, null, null, null,
+                tracingContext);
+        return bytes;
+    }
+
+    /** Generate a random block ID.
+     *
+     * @return Random block ID
+     */
+    public static String generateBlockId() {
+        // PutBlock on the path.
+        byte[] blockIdByteArray = new byte[BLOCK_ID_LENGTH];
+        RANDOM.nextBytes(blockIdByteArray);
+        return new String(Base64.encodeBase64(blockIdByteArray),
+                StandardCharsets.UTF_8);
+    }
+
+    /** Create the JSON file with the contents.
+     *
+     * @param path Path of the JSON file
+     * @param bytes Contents of the JSON file
+     * @throws AzureBlobFileSystemException If the create operation fails.
+     */
+    @VisibleForTesting
+    void createRenamePendingJson(Path path, byte[] bytes)
+            throws AzureBlobFileSystemException {
+        // PutBlob on the path.
+        AbfsRestOperation putBlobOp = 
abfsClient.createPath(path.toUri().getPath(),
+                true,
+                true,
+                null,
+                false,
+                null,
+                null,
+                tracingContext);
+        String eTag = extractEtagHeader(putBlobOp.getResult());
+
+        String blockId = generateBlockId();
+        AppendRequestParameters appendRequestParameters
+                = new AppendRequestParameters(0, 0,
+                bytes.length, AppendRequestParameters.Mode.APPEND_MODE, false, 
null,
+                abfsClient.getAbfsConfiguration().isExpectHeaderEnabled(),
+                new BlobAppendRequestParameters(blockId, eTag));
+
+        abfsClient.append(path.toUri().getPath(), bytes,
+                appendRequestParameters, null, null, tracingContext);
+
+//        List<String> blockIdList = new 
ArrayList<>(Collections.singleton(blockId));
+//        String blockList = generateBlockListXml(blockIdList);
+        // PutBlockList on the path.
+        String blockList = "";

Review Comment:
   Added this line just to pass the build. The above commented line is calling 
generateBlockListXml which requires ingress handler changes. Will pick the 
changes of ingress handler once it is merged and this line is no longer needed 
after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to