This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22a6ad99379 HADOOP-19902. [ABFS] Fix small write hflush followed by 
close (#8513)
22a6ad99379 is described below

commit 22a6ad99379850e1fc43e3032a9d676fff7abbf2
Author: Chao Sun <[email protected]>
AuthorDate: Thu May 28 13:26:48 2026 -0700

    HADOOP-19902. [ABFS] Fix small write hflush followed by close (#8513)
---
 .../fs/azurebfs/services/AbfsOutputStream.java     | 11 +++-
 .../fs/azurebfs/services/TestAbfsOutputStream.java | 61 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index cb48fea42fc..c7f090f91c4 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -793,8 +793,15 @@ && hasActiveBlockDataToUpload()) { // there is
    */
   private synchronized void smallWriteOptimizedflushInternal(boolean isClose) 
throws IOException {
     // writeCurrentBufferToService will increment 
numOfAppendsToServerSinceLastFlush
-    uploadBlockAsync(getBlockManager().getActiveBlock(),
-        true, isClose);
+    try {
+      uploadBlockAsync(getBlockManager().getActiveBlock(),
+          true, isClose);
+    } finally {
+      if (getBlockManager().hasActiveBlock()) {
+        // The block has been consumed by upload; new writes need a new block.
+        getBlockManager().clearActiveBlock();
+      }
+    }
     waitForAppendsToComplete();
     shrinkWriteOperationQueue();
     maybeThrowLastError();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index ec080369e19..0f5bdca1d58 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -59,6 +59,7 @@
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static 
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
 
 public final class TestAbfsOutputStream {
 
@@ -81,6 +82,23 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
       TracingContext tracingContext,
       ExecutorService executorService) throws IOException,
       IllegalAccessException {
+    return populateAbfsOutputStreamContext(writeBufferSize, isFlushEnabled,
+        disableOutputStreamFlush, isAppendBlob, isExpectHeaderEnabled,
+        clientHandler, path, tracingContext, executorService, false);
+  }
+
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext(
+      int writeBufferSize,
+      boolean isFlushEnabled,
+      boolean disableOutputStreamFlush,
+      boolean isAppendBlob,
+      boolean isExpectHeaderEnabled,
+      AbfsClientHandler clientHandler,
+      String path,
+      TracingContext tracingContext,
+      ExecutorService executorService,
+      boolean enableSmallWriteOptimization) throws IOException,
+      IllegalAccessException {
     AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
         accountName1);
     String blockFactoryName =
@@ -95,6 +113,7 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
             .withWriteBufferSize(writeBufferSize)
             .enableExpectHeader(isExpectHeaderEnabled)
             .enableFlush(isFlushEnabled)
+            .enableSmallWriteOptimization(enableSmallWriteOptimization)
             .disableOutputStreamFlush(disableOutputStreamFlush)
             .withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
             .withAppendBlob(isAppendBlob)
@@ -108,6 +127,48 @@ private AbfsOutputStreamContext 
populateAbfsOutputStreamContext(
             .build();
   }
 
+  @Test
+  public void verifySmallWriteOptimizedHFlushFollowedByClose() throws 
Exception {
+    AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new 
URI("abcd")));
+    AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
+    AbfsDfsClient client = mock(AbfsDfsClient.class);
+    AbfsRestOperation op = mock(AbfsRestOperation.class);
+    final Configuration conf = new Configuration();
+    conf.set(accountKey1, accountValue1);
+    AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName1);
+    AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1, 
abfsConf);
+    when(client.getAbfsPerfTracker()).thenReturn(tracker);
+    when(client.getAbfsConfiguration()).thenReturn(abfsConf);
+    when(client.getAbfsCounters()).thenReturn(abfsCounters);
+    when(client.append(anyString(), any(byte[].class),
+        any(AppendRequestParameters.class), any(), any(),
+        any(TracingContext.class))).thenReturn(op);
+    when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(), 
any(),
+        isNull(), any(), any(TracingContext.class), 
anyString())).thenReturn(op);
+    when(clientHandler.getClient(any())).thenReturn(client);
+    when(clientHandler.getDfsClient()).thenReturn(client);
+
+    AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
+        populateAbfsOutputStreamContext(
+            BUFFER_SIZE, true, false, false, true, clientHandler, PATH,
+            new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
+                FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(), 
null),
+            createExecutorService(abfsConf), true)));
+    when(out.getClient()).thenReturn(client);
+    when(out.getMd5()).thenReturn(null);
+
+    out.write(new byte[WRITE_SIZE]);
+    out.hflush();
+    out.close();
+
+    AppendRequestParameters reqParameters = new AppendRequestParameters(
+        0, 0, WRITE_SIZE, FLUSH_MODE, false, null, true, null);
+    verify(client, times(1)).append(eq(PATH), any(byte[].class),
+        refEq(reqParameters), any(), any(), any(TracingContext.class));
+    verify(client, times(1)).append(eq(PATH), any(byte[].class), any(),
+        any(), any(), any(TracingContext.class));
+  }
+
   /**
    * The test verifies OutputStream shortwrite case(2000bytes write followed 
by flush, hflush, hsync) is making correct HTTP calls to the server
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to