rakeshadr commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1934191424


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -237,8 +427,12 @@ public synchronized void write(final byte[] data, final 
int off, final int lengt
     if (hasLease() && isLeaseFreed()) {
       throw new PathIOException(path, ERR_WRITE_WITHOUT_LEASE);
     }
-    DataBlocks.DataBlock block = createBlockIfNeeded();
-    int written = block.write(data, off, length);
+    if (length == 0) {
+      return;

Review Comment:
   Please add a log message conveying this condition



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -338,9 +531,14 @@ private void uploadBlockAsync(DataBlocks.DataBlock 
blockToUpload,
              */
             AppendRequestParameters reqParams = new AppendRequestParameters(
                 offset, 0, bytesLength, mode, false, leaseId, 
isExpectHeaderEnabled);
-            AbfsRestOperation op = getClient().append(path,
-                blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
-                contextEncryptionAdapter, new TracingContext(tracingContext));
+            AbfsRestOperation op;
+            try {
+              op = remoteWrite(blockToUpload, blockUploadData, reqParams, 
tracingContext);
+            } catch (InvalidIngressServiceException ex) {
+              switchHandler();

Review Comment:
   Whether this is deterministic switching ? I meant, in case of exception what 
will be the behavior of the new handler?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -328,7 +337,7 @@ public AbfsRestOperation listPath(final String 
relativePath, final boolean recur
       abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
     }
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, 
String.valueOf(listMaxResults));
-    appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, 
abfsUriQueryBuilder);
+    appendSASTokenToQuery(relativePath, 
SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);

Review Comment:
   Hope backward compatibility being taken care?



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -631,45 +805,89 @@ private synchronized void waitForAppendsToComplete() 
throws IOException {
     }
   }
 
+  /**
+   * Flushes the written bytes to the Azure Blob Storage service, ensuring all
+   * appends are completed. This method is typically called during a close 
operation.
+   *
+   * @param isClose indicates whether this flush is happening as part of a 
close operation.
+   * @throws IOException if an I/O error occurs during the flush operation.
+   */
   private synchronized void flushWrittenBytesToService(boolean isClose) throws 
IOException {
+    // Ensure all appends are completed before flushing.
     waitForAppendsToComplete();
+    // Flush the written bytes to the service.
     flushWrittenBytesToServiceInternal(position, false, isClose);
   }
 
+  /**
+   * Asynchronously flushes the written bytes to the Azure Blob Storage 
service.
+   * This method ensures that the write operation queue is managed and only 
flushes
+   * if there are uncommitted data beyond the last flush offset.
+   *
+   * @throws IOException if an I/O error occurs during the flush operation.
+   */
   private synchronized void flushWrittenBytesToServiceAsync() throws 
IOException {
+    // Manage the write operation queue to ensure efficient writes
     shrinkWriteOperationQueue();
 
+    // Only flush if there are uncommitted data beyond the last flush offset
     if (this.lastTotalAppendOffset > this.lastFlushOffset) {
       this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
         false/*Async flush on close not permitted*/);
     }
   }
 
+  /**
+   * Flushes the written bytes to the Azure Blob Storage service.
+   *
+   * @param offset                the offset up to which data needs to be 
flushed.
+   * @param retainUncommitedData whether to retain uncommitted data after 
flush.
+   * @param isClose               whether this flush is happening as part of a 
close operation.
+   * @throws IOException if an I/O error occurs.
+   */
   private synchronized void flushWrittenBytesToServiceInternal(final long 
offset,
       final boolean retainUncommitedData, final boolean isClose) throws 
IOException {
     // flush is called for appendblob only on close
     if (this.isAppendBlob && !isClose) {
       return;
     }
 
+    // Tracker to monitor performance metrics
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "flushWrittenBytesToServiceInternal", "flush")) {
-      AbfsRestOperation op = getClient().flush(path, offset, 
retainUncommitedData,
-          isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
-          new TracingContext(tracingContext));
-      cachedSasToken.update(op.getSasToken());
-      perfInfo.registerResult(op.getResult()).registerSuccess(true);
-    } catch (AzureBlobFileSystemException ex) {
-      if (ex instanceof AbfsRestOperationException) {
-        if (((AbfsRestOperationException) ex).getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
+      AbfsRestOperation op;
+      try {
+        // Attempt to flush data to the remote service.
+        op = remoteFlush(offset, retainUncommitedData, isClose, leaseId,
+            tracingContext);
+      } catch (InvalidIngressServiceException ex) {
+        // If an invalid ingress service is encountered, switch handler and 
retry.
+        switchHandler();

Review Comment:
   The above 'switchHandler' comment is applicable here as well.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java:
##########
@@ -328,7 +337,7 @@ public AbfsRestOperation listPath(final String 
relativePath, final boolean recur
       abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
     }
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, 
String.valueOf(listMaxResults));
-    appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, 
abfsUriQueryBuilder);
+    appendSASTokenToQuery(relativePath, 
SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);

Review Comment:
   Whats the plan for user delegation SAS, any jira created for this? If not 
please create a follow-up jira.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -338,9 +531,14 @@ private void uploadBlockAsync(DataBlocks.DataBlock 
blockToUpload,
              */
             AppendRequestParameters reqParams = new AppendRequestParameters(
                 offset, 0, bytesLength, mode, false, leaseId, 
isExpectHeaderEnabled);
-            AbfsRestOperation op = getClient().append(path,
-                blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
-                contextEncryptionAdapter, new TracingContext(tracingContext));
+            AbfsRestOperation op;
+            try {
+              op = remoteWrite(blockToUpload, blockUploadData, reqParams, 
tracingContext);
+            } catch (InvalidIngressServiceException ex) {
+              switchHandler();

Review Comment:
   Also, please add debug log message in conditional execution cases.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+
+  /** The list of already committed blocks is stored in this list. */
+  private List<String> committedBlockEntries = new ArrayList<>();
+
+  /** The list to store blockId, position, and status. */
+  private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+  /**
+   * Constructs an AzureBlobBlockManager.
+   *
+   * @param abfsOutputStream the output stream
+   * @param blockFactory the block factory
+   * @param bufferSize the buffer size
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream, blockFactory, bufferSize);
+    if (abfsOutputStream.getPosition() > 0 && 
!abfsOutputStream.isAppendBlob()) {
+      this.committedBlockEntries = 
getBlockList(abfsOutputStream.getTracingContext());
+    }
+    LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Creates a new block.
+   *
+   * @param position the position
+   * @return the created block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  protected synchronized AbfsBlock createBlockInternal(long position)
+      throws IOException {
+    if (getActiveBlock() == null) {
+      setBlockCount(getBlockCount() + 1);
+      AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), 
position);
+      activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), 
activeBlock.getOffset()));
+      setActiveBlock(activeBlock);
+    }
+    return getActiveBlock();
+  }
+
+  /**
+   * Returns block id's which are committed for the blob.
+   *
+   * @param tracingContext Tracing context object.
+   * @return list of committed block id's.
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  private List<String> getBlockList(TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    List<String> committedBlockIdList = new ArrayList<>();
+    AbfsBlobClient blobClient = 
getAbfsOutputStream().getClientHandler().getBlobClient();
+    final AbfsRestOperation op = blobClient
+        .getBlockList(getAbfsOutputStream().getPath(), tracingContext);
+    if (op != null && op.getResult() != null) {
+      committedBlockIdList = op.getResult().getBlockIdList();
+    }
+    return committedBlockIdList;
+  }
+
+  /**
+   * Adds a new block entry to the block entry list.
+   * The block entry is added only if the position of the new block
+   * is greater than the position of the last block in the list.
+   *
+   * @param blockId The ID of the new block to be added.
+   * @param position The position of the new block in the stream.
+   * @return The newly added {@link BlockEntry}.
+   * @throws IOException If the position of the new block is not greater than 
the last block in the list.
+   */
+  private synchronized BlockEntry addNewEntry(String blockId, long position) 
throws IOException {
+    if (!blockEntryList.isEmpty()) {
+      BlockEntry lastEntry = blockEntryList.getLast();
+      if (position <= lastEntry.getPosition()) {
+        throw new IOException("New block position " + position  + " must be 
greater than the last block position "
+            + lastEntry.getPosition() + " for path " + 
getAbfsOutputStream().getPath());
+      }
+    }
+    BlockEntry blockEntry = new BlockEntry(blockId, position, 
AbfsBlockStatus.NEW);
+    blockEntryList.addLast(blockEntry);
+    LOG.debug("Added block {} at position {} with status NEW.", blockId, 
position);
+    return blockEntry;
+  }
+
+  /**
+   * Updates the status of an existing block entry to SUCCESS.
+   * This method is used to mark a block as successfully processed.
+   *
+   * @param block The {@link AbfsBlock} whose status needs to be updated to 
SUCCESS.
+   */
+  protected synchronized void updateEntry(AbfsBlock block) {
+    BlockEntry blockEntry = block.getBlockEntry();
+    blockEntry.setStatus(AbfsBlockStatus.SUCCESS);
+    LOG.debug("Added block {} at position {} with status SUCCESS.", 
block.getBlockId(), blockEntry.getPosition());
+  }
+
+  /**
+   * Prepares the list of blocks to commit.
+   *
+   * @return whether we have some data to commit or not.
+   * @throws IOException if an I/O error occurs
+   */
+  protected boolean hasListToCommit() throws IOException {
+    // Adds all the committed blocks if available to the list of blocks to be 
added in putBlockList.
+    if (blockEntryList.isEmpty()) {
+      return false; // No entries to commit
+    }
+    while (!blockEntryList.isEmpty()) {
+      BlockEntry current = blockEntryList.poll();
+      if (current.getStatus() != AbfsBlockStatus.SUCCESS) {
+        LOG.debug(
+            "Block {} with position {} has status {}, flush cannot proceed.",
+            current.getBlockId(), current.getPosition(), current.getStatus());
+        throw new IOException("Flush failed. Block " + current.getBlockId()
+            + " with position " + current.getPosition() + " has status "
+            + current.getStatus() + "for path " + 
getAbfsOutputStream().getPath());
+      }
+      if (!blockEntryList.isEmpty()) {
+        BlockEntry next = blockEntryList.getFirst();
+        if (current.getPosition() >= next.getPosition()) {
+          String errorMessage =
+              "Position check failed. Current block position is greater than 
or equal to the next block's position.\n"
+                  + "Current Block Entry:\n"
+                  + "Block ID: " + current.getBlockId()
+                  + ", Position: " + current.getPosition()
+                  + ", Status: " + current.getStatus()
+                  + ", Path: " + getAbfsOutputStream().getPath()
+                  + ", StreamID: " + getAbfsOutputStream().getStreamID()
+                  + ", Next block position: " + next.getPosition()
+                  + "\n";
+          throw new IOException(errorMessage);
+        }
+      }
+      committedBlockEntries.add(current.getBlockId());

Review Comment:
   Please create a follow-up PR. Probably you can group all the open comments 
and handle it in that PR.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobBlockManager.java:
##########
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+/**
+ * Manages Azure Blob blocks for append operations.
+ */
+public class AzureBlobBlockManager extends AzureBlockManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      AbfsOutputStream.class);
+
+
+  /** The list of already committed blocks is stored in this list. */
+  private List<String> committedBlockEntries = new ArrayList<>();
+
+  /** The list to store blockId, position, and status. */
+  private final LinkedList<BlockEntry> blockEntryList = new LinkedList<>();
+
+
+  /**
+   * Constructs an AzureBlobBlockManager.
+   *
+   * @param abfsOutputStream the output stream
+   * @param blockFactory the block factory
+   * @param bufferSize the buffer size
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize)
+      throws AzureBlobFileSystemException {
+    super(abfsOutputStream, blockFactory, bufferSize);
+    if (abfsOutputStream.getPosition() > 0 && 
!abfsOutputStream.isAppendBlob()) {
+      this.committedBlockEntries = 
getBlockList(abfsOutputStream.getTracingContext());
+    }
+    LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance 
{} for path {}",
+        abfsOutputStream.getStreamID(), abfsOutputStream.getPath());
+  }
+
+  /**
+   * Creates a new block.
+   *
+   * @param position the position
+   * @return the created block
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  protected synchronized AbfsBlock createBlockInternal(long position)
+      throws IOException {
+    if (getActiveBlock() == null) {
+      setBlockCount(getBlockCount() + 1);
+      AbfsBlock activeBlock = new AbfsBlobBlock(getAbfsOutputStream(), 
position);
+      activeBlock.setBlockEntry(addNewEntry(activeBlock.getBlockId(), 
activeBlock.getOffset()));
+      setActiveBlock(activeBlock);
+    }
+    return getActiveBlock();
+  }
+
+  /**
+   * Returns block id's which are committed for the blob.
+   *
+   * @param tracingContext Tracing context object.
+   * @return list of committed block id's.
+   * @throws AzureBlobFileSystemException if an error occurs
+   */
+  private List<String> getBlockList(TracingContext tracingContext)
+      throws AzureBlobFileSystemException {
+    List<String> committedBlockIdList = new ArrayList<>();
+    AbfsBlobClient blobClient = 
getAbfsOutputStream().getClientHandler().getBlobClient();
+    final AbfsRestOperation op = blobClient
+        .getBlockList(getAbfsOutputStream().getPath(), tracingContext);
+    if (op != null && op.getResult() != null) {
+      committedBlockIdList = op.getResult().getBlockIdList();
+    }
+    return committedBlockIdList;
+  }
+
+  /**
+   * Adds a new block entry to the block entry list.
+   * The block entry is added only if the position of the new block
+   * is greater than the position of the last block in the list.
+   *
+   * @param blockId The ID of the new block to be added.
+   * @param position The position of the new block in the stream.
+   * @return The newly added {@link BlockEntry}.
+   * @throws IOException If the position of the new block is not greater than 
the last block in the list.
+   */
+  private synchronized BlockEntry addNewEntry(String blockId, long position) 
throws IOException {
+    if (!blockEntryList.isEmpty()) {
+      BlockEntry lastEntry = blockEntryList.getLast();
+      if (position <= lastEntry.getPosition()) {
+        throw new IOException("New block position " + position  + " must be 
greater than the last block position "
+            + lastEntry.getPosition() + " for path " + 
getAbfsOutputStream().getPath());
+      }
+    }
+    BlockEntry blockEntry = new BlockEntry(blockId, position, 
AbfsBlockStatus.NEW);
+    blockEntryList.addLast(blockEntry);
+    LOG.debug("Added block {} at position {} with status NEW.", blockId, 
position);
+    return blockEntry;
+  }
+
+  /**
+   * Updates the status of an existing block entry to SUCCESS.
+   * This method is used to mark a block as successfully processed.
+   *
+   * @param block The {@link AbfsBlock} whose status needs to be updated to 
SUCCESS.
+   */
+  protected synchronized void updateEntry(AbfsBlock block) {
+    BlockEntry blockEntry = block.getBlockEntry();
+    blockEntry.setStatus(AbfsBlockStatus.SUCCESS);
+    LOG.debug("Added block {} at position {} with status SUCCESS.", 
block.getBlockId(), blockEntry.getPosition());
+  }
+
+  /**
+   * Prepares the list of blocks to commit.
+   *
+   * @return whether we have some data to commit or not.
+   * @throws IOException if an I/O error occurs
+   */
+  protected boolean hasListToCommit() throws IOException {

Review Comment:
   since its protected, it can be overridden in future and can break this 
assumption unknowingly. Since "synchronized" reentrant by nature, its better to 
make this method also "synchronized"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to