[ 
https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922257#comment-17922257
 ] 

ASF GitHub Bot commented on HADOOP-19232:
-----------------------------------------

anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1935007959


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext 
abfsOutputStreamContext)
     this.tracingContext.setOperation(FSOperationType.WRITE);
     this.ioStatistics = outputStreamStatistics.getIOStatistics();
     this.blockFactory = abfsOutputStreamContext.getBlockFactory();
-    this.blockSize = bufferSize;
-    // create that first block. This guarantees that an open + close sequence
-    // writes a 0-byte entry.
-    createBlockIfNeeded();
+    this.isDFSToBlobFallbackEnabled
+        = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+    this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+    this.currentExecutingServiceType = 
abfsOutputStreamContext.getIngressServiceType();
+    this.clientHandler = abfsOutputStreamContext.getClientHandler();
+    createIngressHandler(serviceTypeAtInit,
+        abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+  }
+
+  /**
+   * Retrieves the current ingress handler.
+   *
+   * @return the current {@link AzureIngressHandler}.
+   */
+  public AzureIngressHandler getIngressHandler() {
+    return ingressHandler;
+  }
+
+  private final Lock lock = new ReentrantLock();
+
+  private volatile boolean switchCompleted = false;
+
+  /**
+   * Creates or retrieves an existing Azure ingress handler based on the 
service type and provided parameters.
+   * <p>
+   * If the `ingressHandler` is already initialized and the switch operation 
is complete, the existing
+   * handler is returned without acquiring a lock to minimize performance 
overhead.
+   * If the `ingressHandler` is initialized but the switch is incomplete, a 
lock is acquired to safely modify
+   * or create a new handler. Double-checked locking is used to ensure thread 
safety while minimizing the
+   * time spent in the critical section.
+   * If the `ingressHandler` is `null`, the handler is safely initialized 
outside of the lock as no other
+   * thread would be modifying it.
+   * </p>
+   *
+   * @param serviceType   The type of Azure service to handle (e.g., ABFS, 
Blob, etc.).
+   * @param blockFactory  The factory to create data blocks used in the 
handler.
+   * @param bufferSize    The buffer size used by the handler for data 
processing.
+   * @param isSwitch      A flag indicating whether a switch operation is in 
progress.
+   * @param blockManager  The manager responsible for handling blocks of data 
during processing.
+   *
+   * @return The initialized or existing Azure ingress handler.
+   * @throws IOException If an I/O error occurs during handler creation or 
data processing.
+   */
+  private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws 
IOException {
+    if (ingressHandler != null) {
+      if (switchCompleted) {
+        return ingressHandler; // Return the handler if it's already 
initialized and the switch is completed
+      }
+      // If the switch is incomplete, lock to safely modify
+      lock.lock();
+      try {
+        // Double-check the condition after acquiring the lock
+        if (switchCompleted) {
+          return ingressHandler; // Return the handler if it's now completed
+        }
+        // If the switch is still incomplete, create a new handler
+        return createNewHandler(serviceType, blockFactory, bufferSize, 
isSwitch, blockManager);
+      } finally {
+        lock.unlock();
+      }
+    }
+    // If ingressHandler is null, no lock is needed; safely initialize it 
outside the lock
+    return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, 
blockManager);
   }
 
+  // Helper method to create a new handler, used in both scenarios (locked and 
unlocked)
+  private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize,
+      boolean isSwitch,
+      AzureBlockManager blockManager) throws IOException {
+    this.client = clientHandler.getClient(serviceType);
+    if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != 
AbfsServiceType.DFS) {
+      throw new InvalidConfigurationValueException(
+          "The ingress service type must be configured as DFS");
+    }
+    if (isDFSToBlobFallbackEnabled && !isSwitch) {
+      ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+          blockFactory, bufferSize, eTag, clientHandler);
+    } else if (serviceType == AbfsServiceType.BLOB) {
+      ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler, blockManager);
+    } else {
+      ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler);
+    }
+    if (isSwitch) {
+      switchCompleted = true;
+    }
+    return ingressHandler;
+  }
+
+  /**
+   * Switches the current ingress handler and service type if necessary.
+   *
+   * @throws IOException if there is an error creating the new ingress handler.
+   */
+  protected void switchHandler() throws IOException {
+    if (serviceTypeAtInit != currentExecutingServiceType) {
+      return;

Review Comment:
   taken



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext 
abfsOutputStreamContext)
     this.tracingContext.setOperation(FSOperationType.WRITE);
     this.ioStatistics = outputStreamStatistics.getIOStatistics();
     this.blockFactory = abfsOutputStreamContext.getBlockFactory();
-    this.blockSize = bufferSize;
-    // create that first block. This guarantees that an open + close sequence
-    // writes a 0-byte entry.
-    createBlockIfNeeded();
+    this.isDFSToBlobFallbackEnabled
+        = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+    this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+    this.currentExecutingServiceType = 
abfsOutputStreamContext.getIngressServiceType();
+    this.clientHandler = abfsOutputStreamContext.getClientHandler();
+    createIngressHandler(serviceTypeAtInit,
+        abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+  }
+
+  /**
+   * Retrieves the current ingress handler.
+   *
+   * @return the current {@link AzureIngressHandler}.
+   */
+  public AzureIngressHandler getIngressHandler() {
+    return ingressHandler;
+  }
+
+  private final Lock lock = new ReentrantLock();
+
+  private volatile boolean switchCompleted = false;
+
+  /**
+   * Creates or retrieves an existing Azure ingress handler based on the 
service type and provided parameters.
+   * <p>
+   * If the `ingressHandler` is already initialized and the switch operation 
is complete, the existing
+   * handler is returned without acquiring a lock to minimize performance 
overhead.
+   * If the `ingressHandler` is initialized but the switch is incomplete, a 
lock is acquired to safely modify
+   * or create a new handler. Double-checked locking is used to ensure thread 
safety while minimizing the
+   * time spent in the critical section.
+   * If the `ingressHandler` is `null`, the handler is safely initialized 
outside of the lock as no other
+   * thread would be modifying it.
+   * </p>
+   *
+   * @param serviceType   The type of Azure service to handle (e.g., ABFS, 
Blob, etc.).
+   * @param blockFactory  The factory to create data blocks used in the 
handler.
+   * @param bufferSize    The buffer size used by the handler for data 
processing.
+   * @param isSwitch      A flag indicating whether a switch operation is in 
progress.
+   * @param blockManager  The manager responsible for handling blocks of data 
during processing.
+   *
+   * @return The initialized or existing Azure ingress handler.
+   * @throws IOException If an I/O error occurs during handler creation or 
data processing.
+   */
+  private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws 
IOException {
+    if (ingressHandler != null) {
+      if (switchCompleted) {
+        return ingressHandler; // Return the handler if it's already 
initialized and the switch is completed
+      }
+      // If the switch is incomplete, lock to safely modify
+      lock.lock();
+      try {
+        // Double-check the condition after acquiring the lock
+        if (switchCompleted) {
+          return ingressHandler; // Return the handler if it's now completed
+        }
+        // If the switch is still incomplete, create a new handler
+        return createNewHandler(serviceType, blockFactory, bufferSize, 
isSwitch, blockManager);
+      } finally {
+        lock.unlock();
+      }
+    }
+    // If ingressHandler is null, no lock is needed; safely initialize it 
outside the lock
+    return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, 
blockManager);
   }
 
+  // Helper method to create a new handler, used in both scenarios (locked and 
unlocked)
+  private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+      DataBlocks.BlockFactory blockFactory,
+      int bufferSize,
+      boolean isSwitch,
+      AzureBlockManager blockManager) throws IOException {
+    this.client = clientHandler.getClient(serviceType);
+    if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != 
AbfsServiceType.DFS) {
+      throw new InvalidConfigurationValueException(
+          "The ingress service type must be configured as DFS");
+    }
+    if (isDFSToBlobFallbackEnabled && !isSwitch) {
+      ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+          blockFactory, bufferSize, eTag, clientHandler);
+    } else if (serviceType == AbfsServiceType.BLOB) {
+      ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler, blockManager);
+    } else {
+      ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+          bufferSize, eTag, clientHandler);
+    }
+    if (isSwitch) {
+      switchCompleted = true;
+    }
+    return ingressHandler;
+  }
+
+  /**
+   * Switches the current ingress handler and service type if necessary.
+   *
+   * @throws IOException if there is an error creating the new ingress handler.
+   */
+  protected void switchHandler() throws IOException {
+    if (serviceTypeAtInit != currentExecutingServiceType) {
+      return;
+    }
+    if (serviceTypeAtInit == AbfsServiceType.BLOB) {

Review Comment:
   taken





> ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback 
> Handling
> -------------------------------------------------------------------------------
>
>                 Key: HADOOP-19232
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19232
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.4.0
>            Reporter: Anuj Modi
>            Assignee: Anmol Asrani
>            Priority: Major
>              Labels: pull-request-available
>
> Scope of this task is to refactor the AbfsOutputStream class to handle the 
> ingress for DFS and Blob endpoint effectively.
> More details will be added soon.
> Perquisites for this Patch:
> 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for 
> supporting both DFS and Blob Endpoint - ASF JIRA (apache.org)
> 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob 
> Endpoint for AbfsBlobClient - ASF JIRA (apache.org)
> 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs 
> and Metadata APIs - ASF JIRA (apache.org)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to