rakeshadr commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1934934413
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext
abfsOutputStreamContext)
this.tracingContext.setOperation(FSOperationType.WRITE);
this.ioStatistics = outputStreamStatistics.getIOStatistics();
this.blockFactory = abfsOutputStreamContext.getBlockFactory();
- this.blockSize = bufferSize;
- // create that first block. This guarantees that an open + close sequence
- // writes a 0-byte entry.
- createBlockIfNeeded();
+ this.isDFSToBlobFallbackEnabled
+ = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+ this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+ this.currentExecutingServiceType =
abfsOutputStreamContext.getIngressServiceType();
+ this.clientHandler = abfsOutputStreamContext.getClientHandler();
+ createIngressHandler(serviceTypeAtInit,
+ abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+ }
+
+ /**
+ * Retrieves the current ingress handler.
+ *
+ * @return the current {@link AzureIngressHandler}.
+ */
+ public AzureIngressHandler getIngressHandler() {
+ return ingressHandler;
+ }
+
+ private final Lock lock = new ReentrantLock();
+
+ private volatile boolean switchCompleted = false;
+
+ /**
+ * Creates or retrieves an existing Azure ingress handler based on the
service type and provided parameters.
+ * <p>
+ * If the `ingressHandler` is already initialized and the switch operation
is complete, the existing
+ * handler is returned without acquiring a lock to minimize performance
overhead.
+ * If the `ingressHandler` is initialized but the switch is incomplete, a
lock is acquired to safely modify
+ * or create a new handler. Double-checked locking is used to ensure thread
safety while minimizing the
+ * time spent in the critical section.
+ * If the `ingressHandler` is `null`, the handler is safely initialized
outside of the lock as no other
+ * thread would be modifying it.
+ * </p>
+ *
+ * @param serviceType The type of Azure service to handle (e.g., ABFS,
Blob, etc.).
+ * @param blockFactory The factory to create data blocks used in the
handler.
+ * @param bufferSize The buffer size used by the handler for data
processing.
+ * @param isSwitch A flag indicating whether a switch operation is in
progress.
+ * @param blockManager The manager responsible for handling blocks of data
during processing.
+ *
+ * @return The initialized or existing Azure ingress handler.
+ * @throws IOException If an I/O error occurs during handler creation or
data processing.
+ */
+ private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws
IOException {
+ if (ingressHandler != null) {
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's already
initialized and the switch is completed
+ }
+ // If the switch is incomplete, lock to safely modify
+ lock.lock();
+ try {
+ // Double-check the condition after acquiring the lock
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's now completed
+ }
+ // If the switch is still incomplete, create a new handler
+ return createNewHandler(serviceType, blockFactory, bufferSize,
isSwitch, blockManager);
+ } finally {
+ lock.unlock();
+ }
+ }
+ // If ingressHandler is null, no lock is needed; safely initialize it
outside the lock
+ return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch,
blockManager);
}
+ // Helper method to create a new handler, used in both scenarios (locked and
unlocked)
+ private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize,
+ boolean isSwitch,
+ AzureBlockManager blockManager) throws IOException {
+ this.client = clientHandler.getClient(serviceType);
+ if (isDFSToBlobFallbackEnabled && serviceTypeAtInit !=
AbfsServiceType.DFS) {
+ throw new InvalidConfigurationValueException(
+ "The ingress service type must be configured as DFS");
+ }
+ if (isDFSToBlobFallbackEnabled && !isSwitch) {
+ ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+ blockFactory, bufferSize, eTag, clientHandler);
+ } else if (serviceType == AbfsServiceType.BLOB) {
+ ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler, blockManager);
+ } else {
+ ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler);
+ }
+ if (isSwitch) {
+ switchCompleted = true;
+ }
+ return ingressHandler;
+ }
+
+ /**
+ * Switches the current ingress handler and service type if necessary.
+ *
+ * @throws IOException if there is an error creating the new ingress handler.
+ */
+ protected void switchHandler() throws IOException {
+ if (serviceTypeAtInit != currentExecutingServiceType) {
+ return;
Review Comment:
Please add a debug message about this != condition, conveying in which
situation this case occurs.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -177,12 +195,181 @@ public AbfsOutputStream(AbfsOutputStreamContext
abfsOutputStreamContext)
this.tracingContext.setOperation(FSOperationType.WRITE);
this.ioStatistics = outputStreamStatistics.getIOStatistics();
this.blockFactory = abfsOutputStreamContext.getBlockFactory();
- this.blockSize = bufferSize;
- // create that first block. This guarantees that an open + close sequence
- // writes a 0-byte entry.
- createBlockIfNeeded();
+ this.isDFSToBlobFallbackEnabled
+ = abfsOutputStreamContext.isDFSToBlobFallbackEnabled();
+ this.serviceTypeAtInit = abfsOutputStreamContext.getIngressServiceType();
+ this.currentExecutingServiceType =
abfsOutputStreamContext.getIngressServiceType();
+ this.clientHandler = abfsOutputStreamContext.getClientHandler();
+ createIngressHandler(serviceTypeAtInit,
+ abfsOutputStreamContext.getBlockFactory(), bufferSize, false, null);
+ }
+
+ /**
+ * Retrieves the current ingress handler.
+ *
+ * @return the current {@link AzureIngressHandler}.
+ */
+ public AzureIngressHandler getIngressHandler() {
+ return ingressHandler;
+ }
+
+ private final Lock lock = new ReentrantLock();
+
+ private volatile boolean switchCompleted = false;
+
+ /**
+ * Creates or retrieves an existing Azure ingress handler based on the
service type and provided parameters.
+ * <p>
+ * If the `ingressHandler` is already initialized and the switch operation
is complete, the existing
+ * handler is returned without acquiring a lock to minimize performance
overhead.
+ * If the `ingressHandler` is initialized but the switch is incomplete, a
lock is acquired to safely modify
+ * or create a new handler. Double-checked locking is used to ensure thread
safety while minimizing the
+ * time spent in the critical section.
+ * If the `ingressHandler` is `null`, the handler is safely initialized
outside of the lock as no other
+ * thread would be modifying it.
+ * </p>
+ *
+ * @param serviceType The type of Azure service to handle (e.g., ABFS,
Blob, etc.).
+ * @param blockFactory The factory to create data blocks used in the
handler.
+ * @param bufferSize The buffer size used by the handler for data
processing.
+ * @param isSwitch A flag indicating whether a switch operation is in
progress.
+ * @param blockManager The manager responsible for handling blocks of data
during processing.
+ *
+ * @return The initialized or existing Azure ingress handler.
+ * @throws IOException If an I/O error occurs during handler creation or
data processing.
+ */
+ private AzureIngressHandler createIngressHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize, boolean isSwitch, AzureBlockManager blockManager) throws
IOException {
+ if (ingressHandler != null) {
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's already
initialized and the switch is completed
+ }
+ // If the switch is incomplete, lock to safely modify
+ lock.lock();
+ try {
+ // Double-check the condition after acquiring the lock
+ if (switchCompleted) {
+ return ingressHandler; // Return the handler if it's now completed
+ }
+ // If the switch is still incomplete, create a new handler
+ return createNewHandler(serviceType, blockFactory, bufferSize,
isSwitch, blockManager);
+ } finally {
+ lock.unlock();
+ }
+ }
+ // If ingressHandler is null, no lock is needed; safely initialize it
outside the lock
+ return createNewHandler(serviceType, blockFactory, bufferSize, isSwitch,
blockManager);
}
+ // Helper method to create a new handler, used in both scenarios (locked and
unlocked)
+ private AzureIngressHandler createNewHandler(AbfsServiceType serviceType,
+ DataBlocks.BlockFactory blockFactory,
+ int bufferSize,
+ boolean isSwitch,
+ AzureBlockManager blockManager) throws IOException {
+ this.client = clientHandler.getClient(serviceType);
+ if (isDFSToBlobFallbackEnabled && serviceTypeAtInit !=
AbfsServiceType.DFS) {
+ throw new InvalidConfigurationValueException(
+ "The ingress service type must be configured as DFS");
+ }
+ if (isDFSToBlobFallbackEnabled && !isSwitch) {
+ ingressHandler = new AzureDfsToBlobIngressFallbackHandler(this,
+ blockFactory, bufferSize, eTag, clientHandler);
+ } else if (serviceType == AbfsServiceType.BLOB) {
+ ingressHandler = new AzureBlobIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler, blockManager);
+ } else {
+ ingressHandler = new AzureDFSIngressHandler(this, blockFactory,
+ bufferSize, eTag, clientHandler);
+ }
+ if (isSwitch) {
+ switchCompleted = true;
+ }
+ return ingressHandler;
+ }
+
+ /**
+ * Switches the current ingress handler and service type if necessary.
+ *
+ * @throws IOException if there is an error creating the new ingress handler.
+ */
+ protected void switchHandler() throws IOException {
+ if (serviceTypeAtInit != currentExecutingServiceType) {
+ return;
+ }
+ if (serviceTypeAtInit == AbfsServiceType.BLOB) {
Review Comment:
As this is being used for fallback ingress, please add log message about the
auto-switching (includes the new selected type as well). Please keep as info
log priority as this would be useful to the users.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]