anmolanmol1234 commented on code in PR #7272:
URL: https://github.com/apache/hadoop/pull/7272#discussion_r1934323725
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -338,9 +531,14 @@ private void uploadBlockAsync(DataBlocks.DataBlock
blockToUpload,
*/
AppendRequestParameters reqParams = new AppendRequestParameters(
offset, 0, bytesLength, mode, false, leaseId,
isExpectHeaderEnabled);
- AbfsRestOperation op = getClient().append(path,
- blockUploadData.toByteArray(), reqParams, cachedSasToken.get(),
- contextEncryptionAdapter, new TracingContext(tracingContext));
+ AbfsRestOperation op;
+ try {
+ op = remoteWrite(blockToUpload, blockUploadData, reqParams,
tracingContext);
+ } catch (InvalidIngressServiceException ex) {
+ switchHandler();
Review Comment:
This fallback mechanism ensures compatibility across different endpoints.
For example, if a file is created over DFS and later updated over the Blob
endpoint, the operation would fail. Deterministic switching ensures that in
case of an InvalidIngressServiceException, the system automatically selects the
correct endpoint.
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -631,45 +805,89 @@ private synchronized void waitForAppendsToComplete()
throws IOException {
}
}
+ /**
+ * Flushes the written bytes to the Azure Blob Storage service, ensuring all
+ * appends are completed. This method is typically called during a close
operation.
+ *
+ * @param isClose indicates whether this flush is happening as part of a
close operation.
+ * @throws IOException if an I/O error occurs during the flush operation.
+ */
private synchronized void flushWrittenBytesToService(boolean isClose) throws
IOException {
+ // Ensure all appends are completed before flushing.
waitForAppendsToComplete();
+ // Flush the written bytes to the service.
flushWrittenBytesToServiceInternal(position, false, isClose);
}
+ /**
+ * Asynchronously flushes the written bytes to the Azure Blob Storage
service.
+ * This method ensures that the write operation queue is managed and only
flushes
+ * if there are uncommitted data beyond the last flush offset.
+ *
+ * @throws IOException if an I/O error occurs during the flush operation.
+ */
private synchronized void flushWrittenBytesToServiceAsync() throws
IOException {
+ // Manage the write operation queue to ensure efficient writes
shrinkWriteOperationQueue();
+ // Only flush if there are uncommitted data beyond the last flush offset
if (this.lastTotalAppendOffset > this.lastFlushOffset) {
this.flushWrittenBytesToServiceInternal(this.lastTotalAppendOffset, true,
false/*Async flush on close not permitted*/);
}
}
+ /**
+ * Flushes the written bytes to the Azure Blob Storage service.
+ *
+ * @param offset the offset up to which data needs to be
flushed.
+ * @param retainUncommitedData whether to retain uncommitted data after
flush.
+ * @param isClose whether this flush is happening as part of a
close operation.
+ * @throws IOException if an I/O error occurs.
+ */
private synchronized void flushWrittenBytesToServiceInternal(final long
offset,
final boolean retainUncommitedData, final boolean isClose) throws
IOException {
// flush is called for appendblob only on close
if (this.isAppendBlob && !isClose) {
return;
}
+ // Tracker to monitor performance metrics
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"flushWrittenBytesToServiceInternal", "flush")) {
- AbfsRestOperation op = getClient().flush(path, offset,
retainUncommitedData,
- isClose, cachedSasToken.get(), leaseId, contextEncryptionAdapter,
- new TracingContext(tracingContext));
- cachedSasToken.update(op.getSasToken());
- perfInfo.registerResult(op.getResult()).registerSuccess(true);
- } catch (AzureBlobFileSystemException ex) {
- if (ex instanceof AbfsRestOperationException) {
- if (((AbfsRestOperationException) ex).getStatusCode() ==
HttpURLConnection.HTTP_NOT_FOUND) {
+ AbfsRestOperation op;
+ try {
+ // Attempt to flush data to the remote service.
+ op = remoteFlush(offset, retainUncommitedData, isClose, leaseId,
+ tracingContext);
+ } catch (InvalidIngressServiceException ex) {
+ // If an invalid ingress service is encountered, switch handler and
retry.
+ switchHandler();
Review Comment:
addressed above and added log message
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]