[ https://issues.apache.org/jira/browse/HADOOP-19232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17922257#comment-17922257 ]
ASF GitHub Bot commented on HADOOP-19232: ----------------------------------------- anmolanmol1234 commented on code in PR #7272: URL: https://github.com/apache/hadoop/pull/7272#discussion_r1935007959 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java: ########## @@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.tracingContext.setOperation(FSOperationType.WRITE); this.ioStatistics = outputStreamStatistics.getIOStatistics(); this.blockFactory = abfsOutputStreamContext.getBlockFactory(); - this.blockSize = bufferSize; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); + this.isDFSToBlobFallbackEnabled + = abfsOutputStreamContext.isDFSToBlobFallbackEnabled(); + this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType(); + this.currentExecutingServiceType = abfsOutputStreamContext.getIngressServiceType(); + this.clientHandler = abfsOutputStreamContext.getClientHandler(); + createIngressHandler(serviceTypeAtInit, + abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null); + } + + /** + * Retrieves the current ingress handler. + * + * @return the current {@link AzureIngressHandler}. + */ + public AzureIngressHandler getIngressHandler() { + return ingressHandler; + } + + private final Lock lock = new ReentrantLock(); + + private volatile boolean switchCompleted = false; + + /** + * Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters. + * <p> + * If the `ingressHandler` is already initialized and the switch operation is complete, the existing + * handler is returned without acquiring a lock to minimize performance overhead. + * If the `ingressHandler` is initialized but the switch is incomplete, a lock is acquired to safely modify + * or create a new handler. Double-checked locking is used to ensure thread safety while minimizing the + * time spent in the critical section. + * If the `ingressHandler` is `null`, the handler is safely initialized outside of the lock as no other + * thread would be modifying it. + * </p> + * + * @param serviceType The type of Azure service to handle (e.g., ABFS, Blob, etc.). + * @param blockFactory The factory to create data blocks used in the handler. + * @param bufferSize The buffer size used by the handler for data processing. + * @param isSwitch A flag indicating whether a switch operation is in progress. + * @param blockManager The manager responsible for handling blocks of data during processing. + * + * @return The initialized or existing Azure ingress handler. + * @throws IOException If an I/O error occurs during handler creation or data processing. + */ + private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws IOException { + if (ingressHandler != null) { + if (switchCompleted) { + return ingressHandler; // Return the handler if it's already initialized and the switch is completed + } + // If the switch is incomplete, lock to safely modify + lock.lock(); + try { + // Double-check the condition after acquiring the lock + if (switchCompleted) { + return ingressHandler; // Return the handler if it's now completed + } + // If the switch is still incomplete, create a new handler + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); + } finally { + lock.unlock(); + } + } + // If ingressHandler is null, no lock is needed; safely initialize it outside the lock + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); } + // Helper method to create a new handler, used in both scenarios (locked and unlocked) + private AzureIngressHandler createNewHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, + boolean isSwitch, + AzureBlockManager blockManager) throws IOException { + this.client = clientHandler.getClient(serviceType); + if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != AbfsServiceType.DFS) { + throw new InvalidConfigurationValueException( + "The ingress service type must be configured as DFS"); + } + if (isDFSToBlobFallbackEnabled && !isSwitch) { + ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this, + blockFactory, bufferSize, eTag, clientHandler); + } else if (serviceType == AbfsServiceType.BLOB) { + ingressHandler = new AzureBlobIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler, blockManager); + } else { + ingressHandler = new AzureDFSIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler); + } + if (isSwitch) { + switchCompleted = true; + } + return ingressHandler; + } + + /** + * Switches the current ingress handler and service type if necessary. + * + * @throws IOException if there is an error creating the new ingress handler. + */ + protected void switchHandler() throws IOException { + if (serviceTypeAtInit != currentExecutingServiceType) { + return; Review Comment: taken ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java: ########## @@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext abfsOutputStreamContext) this.tracingContext.setOperation(FSOperationType.WRITE); this.ioStatistics = outputStreamStatistics.getIOStatistics(); this.blockFactory = abfsOutputStreamContext.getBlockFactory(); - this.blockSize = bufferSize; - // create that first block. This guarantees that an open + close sequence - // writes a 0-byte entry. - createBlockIfNeeded(); + this.isDFSToBlobFallbackEnabled + = abfsOutputStreamContext.isDFSToBlobFallbackEnabled(); + this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType(); + this.currentExecutingServiceType = abfsOutputStreamContext.getIngressServiceType(); + this.clientHandler = abfsOutputStreamContext.getClientHandler(); + createIngressHandler(serviceTypeAtInit, + abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null); + } + + /** + * Retrieves the current ingress handler. + * + * @return the current {@link AzureIngressHandler}. + */ + public AzureIngressHandler getIngressHandler() { + return ingressHandler; + } + + private final Lock lock = new ReentrantLock(); + + private volatile boolean switchCompleted = false; + + /** + * Creates or retrieves an existing Azure ingress handler based on the service type and provided parameters. + * <p> + * If the `ingressHandler` is already initialized and the switch operation is complete, the existing + * handler is returned without acquiring a lock to minimize performance overhead. + * If the `ingressHandler` is initialized but the switch is incomplete, a lock is acquired to safely modify + * or create a new handler. Double-checked locking is used to ensure thread safety while minimizing the + * time spent in the critical section. + * If the `ingressHandler` is `null`, the handler is safely initialized outside of the lock as no other + * thread would be modifying it. + * </p> + * + * @param serviceType The type of Azure service to handle (e.g., ABFS, Blob, etc.). + * @param blockFactory The factory to create data blocks used in the handler. + * @param bufferSize The buffer size used by the handler for data processing. + * @param isSwitch A flag indicating whether a switch operation is in progress. + * @param blockManager The manager responsible for handling blocks of data during processing. + * + * @return The initialized or existing Azure ingress handler. + * @throws IOException If an I/O error occurs during handler creation or data processing. + */ + private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws IOException { + if (ingressHandler != null) { + if (switchCompleted) { + return ingressHandler; // Return the handler if it's already initialized and the switch is completed + } + // If the switch is incomplete, lock to safely modify + lock.lock(); + try { + // Double-check the condition after acquiring the lock + if (switchCompleted) { + return ingressHandler; // Return the handler if it's now completed + } + // If the switch is still incomplete, create a new handler + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); + } finally { + lock.unlock(); + } + } + // If ingressHandler is null, no lock is needed; safely initialize it outside the lock + return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch, blockManager); } + // Helper method to create a new handler, used in both scenarios (locked and unlocked) + private AzureIngressHandler createNewHandler(AbfsServiceType serviceType, + DataBlocks.BlockFactory blockFactory, + int bufferSize, + boolean isSwitch, + AzureBlockManager blockManager) throws IOException { + this.client = clientHandler.getClient(serviceType); + if (isDFSToBlobFallbackEnabled && serviceTypeAtInit != AbfsServiceType.DFS) { + throw new InvalidConfigurationValueException( + "The ingress service type must be configured as DFS"); + } + if (isDFSToBlobFallbackEnabled && !isSwitch) { + ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this, + blockFactory, bufferSize, eTag, clientHandler); + } else if (serviceType == AbfsServiceType.BLOB) { + ingressHandler = new AzureBlobIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler, blockManager); + } else { + ingressHandler = new AzureDFSIngressHandler(this, blockFactory, + bufferSize, eTag, clientHandler); + } + if (isSwitch) { + switchCompleted = true; + } + return ingressHandler; + } + + /** + * Switches the current ingress handler and service type if necessary. + * + * @throws IOException if there is an error creating the new ingress handler. + */ + protected void switchHandler() throws IOException { + if (serviceTypeAtInit != currentExecutingServiceType) { + return; + } + if (serviceTypeAtInit == AbfsServiceType.BLOB) { Review Comment: taken > ABFS: [FnsOverBlob] Implementing Ingress Support with various Fallback > Handling > ------------------------------------------------------------------------------- > > Key: HADOOP-19232 > URL: https://issues.apache.org/jira/browse/HADOOP-19232 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.4.0 > Reporter: Anuj Modi > Assignee: Anmol Asrani > Priority: Major > Labels: pull-request-available > > Scope of this task is to refactor the AbfsOutputStream class to handle the > ingress for DFS and Blob endpoint effectively. > More details will be added soon. > Perquisites for this Patch: > 1. [HADOOP-19187] ABFS: [FnsOverBlob]Making AbfsClient Abstract for > supporting both DFS and Blob Endpoint - ASF JIRA (apache.org) > 2. [HADOOP-19226] ABFS: [FnsOverBlob]Implementing Azure Rest APIs on Blob > Endpoint for AbfsBlobClient - ASF JIRA (apache.org) > 3. [HADOOP-19207] ABFS: [FnsOverBlob]Response Handling of Blob Endpoint APIs > and Metadata APIs - ASF JIRA (apache.org) -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org