anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1934323725


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -338,9 +531,14 @@ private void uploadBlockAsync(DataBlocks.DataBlock 
blockToUpload,
              */
             AppendRequestParameters reqParams = new AppendRequestParameters(
                 offset, 0, bytesLength, mode, false, leaseId, 
isExpectHeaderEnabled);
-            AbfsRestOperation op = getClient().append(path,
-                blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
-                contextEncryptionAdapter, new TracingContext(tracingContext));
+            AbfsRestOperation op;
+            try {
+              op = remoteWrite(blockToUpload, blockUploadData, reqParams, 
tracingContext);
+            } catch (InvalidIngressServiceException ex) {
+              switchHandler();

Review Comment:
   This fallback mechanism ensures compatibility across different endpoints. 
For example, if a file is created over DFS and later updated over the Blob 
endpoint, the operation would fail. Deterministic switching ensures that in 
case of an InvalidIngressServiceException, the system automatically selects the 
correct endpoint. 



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -631,45 +805,89 @@ private synchronized void waitForAppendsToComplete() 
throws IOException {
     }
   }
 
+  /**
+   * Flushes the written bytes to the Azure Blob Storage service, ensuring all
+   * appends are completed. This method is typically called during a close 
operation.
+   *
+   * @param isClose indicates whether this flush is happening as part of a 
close operation.
+   * @throws IOException if an I/O error occurs during the flush operation.
+   */
   private synchronized void flushWrittenBytesToService(boolean isClose) throws 
IOException {
+    // Ensure all appends are completed before flushing.
     waitForAppendsToComplete();
+    // Flush the written bytes to the service.
     flushWrittenBytesToServiceInternal(position, false, isClose);
   }
 
+  /**
+   * Asynchronously flushes the written bytes to the Azure Blob Storage 
service.
+   * This method ensures that the write operation queue is managed and only 
flushes
+   * if there are uncommitted data beyond the last flush offset.
+   *
+   * @throws IOException if an I/O error occurs during the flush operation.
+   */
   private synchronized void flushWrittenBytesToServiceAsync() throws 
IOException {
+    // Manage the write operation queue to ensure efficient writes
     shrinkWriteOperationQueue();
 
+    // Only flush if there are uncommitted data beyond the last flush offset
     if (this.lastTotalAppendOffset > this.lastFlushOffset) {
       this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
         false/*Async flush on close not permitted*/);
     }
   }
 
+  /**
+   * Flushes the written bytes to the Azure Blob Storage service.
+   *
+   * @param offset                the offset up to which data needs to be 
flushed.
+   * @param retainUncommitedData whether to retain uncommitted data after 
flush.
+   * @param isClose               whether this flush is happening as part of a 
close operation.
+   * @throws IOException if an I/O error occurs.
+   */
   private synchronized void flushWrittenBytesToServiceInternal(final long 
offset,
       final boolean retainUncommitedData, final boolean isClose) throws 
IOException {
     // flush is called for appendblob only on close
     if (this.isAppendBlob && !isClose) {
       return;
     }
 
+    // Tracker to monitor performance metrics
     AbfsPerfTracker tracker = client.getAbfsPerfTracker();
     try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
             "flushWrittenBytesToServiceInternal", "flush")) {
-      AbfsRestOperation op = getClient().flush(path, offset, 
retainUncommitedData,
-          isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
-          new TracingContext(tracingContext));
-      cachedSasToken.update(op.getSasToken());
-      perfInfo.registerResult(op.getResult()).registerSuccess(true);
-    } catch (AzureBlobFileSystemException ex) {
-      if (ex instanceof AbfsRestOperationException) {
-        if (((AbfsRestOperationException) ex).getStatusCode() == 
HttpURLConnection.HTTP_NOT_FOUND) {
+      AbfsRestOperation op;
+      try {
+        // Attempt to flush data to the remote service.
+        op = remoteFlush(offset, retainUncommitedData, isClose, leaseId,
+            tracingContext);
+      } catch (InvalidIngressServiceException ex) {
+        // If an invalid ingress service is encountered, switch handler and 
retry.
+        switchHandler();

Review Comment:
   addressed above and added log message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to