[ 
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17911359#comment-17911359
 ] 

ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------

bhattmanish98 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1908246055


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
   public static final String DEFAULT_TIMEOUT = "90";
   public static final String APPEND_BLOB_TYPE = "appendblob";
   public static final String LIST = "list";
+  public static final String BLOCK_BLOB_TYPE = "BlockBlob";

Review Comment:
   BlockBlob should be in camel case.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java:
##########
@@ -51,6 +51,8 @@ public final class AbfsHttpConstants {
   public static final String DEFAULT_TIMEOUT = "90";
   public static final String APPEND_BLOB_TYPE = "appendblob";
   public static final String LIST = "list";
+  public static final String BLOCK_BLOB_TYPE = "BlockBlob";
+  public static final String APPEND_BLOCK = "appendblock";

Review Comment:
   Same as above: appendBlock



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -786,18 +1016,165 @@ public String toString() {
     return sb.toString();
   }
 
+  /**
+   * Gets the reference to the file system back.
+   *
+   * @return The back reference to the file system.
+   */
   @VisibleForTesting
   BackReference getFsBackRef() {
     return fsBackRef;
   }
 
+  /**
+   * Gets the executor service used for asynchronous operations.
+   *
+   * @return The executor service.
+   */
   @VisibleForTesting
   ListeningExecutorService getExecutorService() {
     return executorService;
   }
 
+  /**
+   * Gets the Azure Blob Storage client.
+   *
+   * @return The Azure Blob Storage client.
+   */
   @VisibleForTesting
-  AbfsClient getClient() {
+  synchronized AbfsClient getClient() {

Review Comment:
   What is the need of this change?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -376,31 +576,15 @@ private void failureWhileSubmit(Exception ex) throws 
IOException {
     throw lastError;
   }
 
-  /**
-   * Synchronized accessor to the active block.
-   *
-   * @return the active block; null if there isn't one.
-   */
-  private synchronized DataBlocks.DataBlock getActiveBlock() {
-    return activeBlock;
-  }
-
-  /**
-   * Predicate to query whether or not there is an active block.
-   *
-   * @return true if there is an active block.
-   */
-  private synchronized boolean hasActiveBlock() {
-    return activeBlock != null;
-  }
-
   /**
    * Is there an active block and is there any data in it to upload?
    *
    * @return true if there is some data to upload in an active block else 
false.
    */
-  private boolean hasActiveBlockDataToUpload() {
-    return hasActiveBlock() && getActiveBlock().hasData();
+  boolean hasActiveBlockDataToUpload() {
+    AzureBlockManager blockManager = getBlockManager();
+    AbfsBlock activeBlock = blockManager.getActiveBlock();
+    return blockManager.hasActiveBlock() && activeBlock.hasData();

Review Comment:
   Can blockManager and activeBlock be null in any of the case?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -1140,7 +1387,7 @@ public boolean checkIsDir(AbfsHttpOperation result) {
   public boolean checkUserError(int responseStatusCode) {
     return (responseStatusCode >= HttpURLConnection.HTTP_BAD_REQUEST
         && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR
-        && responseStatusCode != HttpURLConnection.HTTP_CONFLICT);
+        && responseStatusCode != HTTP_CONFLICT);

Review Comment:
   We can do same changes for above two codes (HTTP_INTERNAL_ERROR, 
HTTP_BAD_REQUEST) as well.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlock.java:
##########
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Return activeBlock with blockId.
+ */
+public class AbfsBlock implements Closeable {
+
+  private final DataBlocks.DataBlock activeBlock;
+  protected AbfsOutputStream outputStream;
+  private final long offset;
+  private BlockEntry blockEntry;
+
+  /**
+   * Gets the activeBlock and the blockId.
+   * @param outputStream AbfsOutputStream Instance.
+   * @param offset Used to generate blockId based on offset.
+   * @throws IOException
+   */
+  AbfsBlock(AbfsOutputStream outputStream, long offset) throws IOException {
+    this.outputStream = outputStream;
+    this.offset = offset;
+    DataBlocks.BlockFactory blockFactory = 
outputStream.getBlockManager().getBlockFactory();
+    long blockCount = outputStream.getBlockManager().getBlockCount();

Review Comment:
   Since we are using these variables (blockCount, blockSize) only at one 
place, it would be better to call it inplace.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java:
##########
@@ -43,11 +45,18 @@ public final class AbfsErrors {
   public static final String ERR_LEASE_DID_NOT_MATCH = "The lease ID specified 
did not match the "
       + "lease ID for the resource with the specified lease operation";
   public static final String ERR_LEASE_BROKEN = "The lease ID matched, but the 
lease has been "
-    + "broken explicitly and cannot be renewed";
+      + "broken explicitly and cannot be renewed";
   public static final String ERR_LEASE_FUTURE_EXISTS = "There is already an 
existing lease "
       + "operation";
   public static final String ERR_NO_LEASE_THREADS = "Lease desired but no 
lease threads "
       + "configured, set " + FS_AZURE_LEASE_THREADS;
   public static final String ERR_CREATE_ON_ROOT = "Cannot create file over 
root path";
+  public static final String PATH_EXISTS = "The specified path, or an element 
of the path, "
+      + "exists and its resource type is invalid for this operation.";
+  public static final String BLOB_OPERATION_NOT_SUPPORTED = "Blob operation is 
not supported.";
+  public static final String INVALID_APPEND_OPERATION = "The resource was 
created or modified by the Azure Blob Service API "
+      + "and cannot be appended to by the Azure Data Lake Storage Service API";

Review Comment:
   Typo error: `and cannot be appended by the Azure Data Lake Storage Service 
API`.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+
+  /** The list of already committed blocks is stored in this list. */
+  private List<String> committedBlockEntries = new ArrayList<>();
+
+  /** The list to store blockId, position, and status. */
+  private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+  /**
+   * Constructs an AzureBlobBlockManager.
+   *
+   * @param abfsOutputStream the output stream
+   * @param blockFactory the block factory
+   * @param bufferSize the buffer size
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream, blockFactory, bufferSize);
+    if (abfsOutputStream.getPosition() > 0 && 
!abfsOutputStream.isAppendBlob()) {
+      this.committedBlockEntries = 
getBlockList(abfsOutputStream.getTracingContext());
+    }
+    LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Creates a new block.
+   *
+   * @param position the position
+   * @return the created block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  protected synchronized AbfsBlock createBlockInternal(long position)
+      throws IOException {
+    if (activeBlock == null) {
+      blockCount++;
+      activeBlock = new AbfsBlobBlock(abfsOutputStream, position);
+      activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), 
activeBlock.getOffset()));
+    }
+    return activeBlock;
+  }
+
+  /**
+   * Returns block id's which are committed for the blob.
+   *
+   * @param tracingContext Tracing context object.
+   * @return list of committed block id's.
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  private List<String> getBlockList(TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    List<String> committedBlockIdList;
+    AbfsBlobClient blobClient = 
abfsOutputStream.getClientHandler().getBlobClient();
+    final AbfsRestOperation op = blobClient
+        .getBlockList(abfsOutputStream.getPath(), tracingContext);
+    committedBlockIdList = op.getResult().getBlockIdList();

Review Comment:
   A check on `op` and `op.getResult` before calling getBlockIdList is needed.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpOperation.java:
##########
@@ -59,7 +59,7 @@ public abstract class AbfsHttpOperation implements 
AbfsPerfLoggable {
 
   private static final int CLEAN_UP_BUFFER_SIZE = 64 * 1024;
 
-  private static final int ONE_THOUSAND = 1000;
+  public static final int ONE_THOUSAND = 1000;

Review Comment:
   Any reason for making it public?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -522,13 +716,21 @@ public synchronized void close() throws IOException {
       bufferIndex = 0;
       closed = true;
       writeOperations.clear();
-      if (hasActiveBlock()) {
-        clearActiveBlock();
-      }
+      getBlockManager().clearActiveBlock();

Review Comment:
   Should we not have check on hasActiveBlock before clearActiveBlock?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java:
##########
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidIngressServiceException;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import 
org.apache.hadoop.fs.azurebfs.contracts.services.BlobAppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+import org.apache.hadoop.io.IOUtils;
+
+public class AzureBlobIngressHandler extends AzureIngressHandler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+  private volatile String eTag;
+
+  private final AzureBlobBlockManager blobBlockManager;
+
+  private final AbfsBlobClient blobClient;
+
+  private final AbfsClientHandler clientHandler;
+
+  /**
+   * Constructs an AzureBlobIngressHandler.
+   *
+   * @param abfsOutputStream the AbfsOutputStream.
+   * @param blockFactory the block factory.
+   * @param bufferSize the buffer size.
+   * @param eTag the eTag.
+   * @param clientHandler the client handler.
+   * @param blockManager the block manager.
+   * @throws AzureBlobFileSystemException if an error occurs.
+   */
+  public AzureBlobIngressHandler(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, String eTag, AbfsClientHandler clientHandler, 
AzureBlockManager blockManager)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream);
+    this.eTag = eTag;
+    if (blockManager instanceof AzureBlobBlockManager) {
+      this.blobBlockManager = (AzureBlobBlockManager) blockManager;
+    } else {
+      this.blobBlockManager = new AzureBlobBlockManager(this.abfsOutputStream,
+          blockFactory, bufferSize);
+    }
+    this.clientHandler = clientHandler;
+    this.blobClient = clientHandler.getBlobClient();
+    LOG.trace("Created a new BlobIngress Handler for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Buffers data into the specified block.
+   *
+   * @param block the block to buffer data into.
+   * @param data  the data to be buffered.
+   * @param off   the start offset in the data.
+   * @param length the number of bytes to buffer.
+   * @return the number of bytes buffered.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected int bufferData(AbfsBlock block,
+      final byte[] data,
+      final int off,
+      final int length)
+      throws IOException {
+    LOG.trace("Buffering data of length {} to block at offset {}", length, 
off);
+    return block.write(data, off, length);
+  }
+
+  /**
+   * Performs a remote write operation.
+   *
+   * @param blockToUpload the block to upload.
+   * @param uploadData    the data to upload.
+   * @param reqParams     the request parameters.
+   * @param tracingContext the tracing context.
+   * @return the resulting AbfsRestOperation.
+   * @throws IOException if an I/O error occurs.
+   */
+  @Override
+  protected AbfsRestOperation remoteWrite(AbfsBlock blockToUpload,
+      DataBlocks.BlockUploadData uploadData,
+      AppendRequestParameters reqParams,
+      TracingContext tracingContext)
+      throws IOException {
+    BlobAppendRequestParameters blobParams = new 
BlobAppendRequestParameters(blockToUpload.getBlockId(), getETag());
+    reqParams.setBlobParams(blobParams);
+    AbfsRestOperation op;
+    long threadId = Thread.currentThread().getId();

Review Comment:
   Can we get Thread.currentThread().getId() inplace as we are only using it 
once?





> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback 
> Handling
> -------------------------------------------------------------------------------
>
>                 Key: HADOOP-19232
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19232
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Descifrado
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the 
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for 
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob 
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs 
> and Metadata APIs - ASF JIRA (apache.org)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to