This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22a6ad99379 HADOOP-19902. [ABFS] Fix small write hflush followed by
close (#8513)
22a6ad99379 is described below
commit 22a6ad99379850e1fc43e3032a9d676fff7abbf2
Author: Chao Sun <[email protected]>
AuthorDate: Thu May 28 13:26:48 2026 -0700
HADOOP-19902. [ABFS] Fix small write hflush followed by close (#8513)
---
.../fs/azurebfs/services/AbfsOutputStream.java | 11 +++-
.../fs/azurebfs/services/TestAbfsOutputStream.java | 61 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 2 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index cb48fea42fc..c7f090f91c4 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -793,8 +793,15 @@ && hasActiveBlockDataToUpload()) { // there is
*/
private synchronized void smallWriteOptimizedflushInternal(boolean isClose)
throws IOException {
// writeCurrentBufferToService will increment
numOfAppendsToServerSinceLastFlush
- uploadBlockAsync(getBlockManager().getActiveBlock(),
- true, isClose);
+ try {
+ uploadBlockAsync(getBlockManager().getActiveBlock(),
+ true, isClose);
+ } finally {
+ if (getBlockManager().hasActiveBlock()) {
+ // The block has been consumed by upload; new writes need a new block.
+ getBlockManager().clearActiveBlock();
+ }
+ }
waitForAppendsToComplete();
shrinkWriteOperationQueue();
maybeThrowLastError();
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index ec080369e19..0f5bdca1d58 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -59,6 +59,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static
org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.FLUSH_MODE;
public final class TestAbfsOutputStream {
@@ -81,6 +82,23 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
TracingContext tracingContext,
ExecutorService executorService) throws IOException,
IllegalAccessException {
+ return populateAbfsOutputStreamContext(writeBufferSize, isFlushEnabled,
+ disableOutputStreamFlush, isAppendBlob, isExpectHeaderEnabled,
+ clientHandler, path, tracingContext, executorService, false);
+ }
+
+ private AbfsOutputStreamContext populateAbfsOutputStreamContext(
+ int writeBufferSize,
+ boolean isFlushEnabled,
+ boolean disableOutputStreamFlush,
+ boolean isAppendBlob,
+ boolean isExpectHeaderEnabled,
+ AbfsClientHandler clientHandler,
+ String path,
+ TracingContext tracingContext,
+ ExecutorService executorService,
+ boolean enableSmallWriteOptimization) throws IOException,
+ IllegalAccessException {
AbfsConfiguration abfsConf = new AbfsConfiguration(new Configuration(),
accountName1);
String blockFactoryName =
@@ -95,6 +113,7 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.withWriteBufferSize(writeBufferSize)
.enableExpectHeader(isExpectHeaderEnabled)
.enableFlush(isFlushEnabled)
+ .enableSmallWriteOptimization(enableSmallWriteOptimization)
.disableOutputStreamFlush(disableOutputStreamFlush)
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
@@ -108,6 +127,48 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.build();
}
+ @Test
+ public void verifySmallWriteOptimizedHFlushFollowedByClose() throws
Exception {
+ AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new
URI("abcd")));
+ AbfsClientHandler clientHandler = mock(AbfsClientHandler.class);
+ AbfsDfsClient client = mock(AbfsDfsClient.class);
+ AbfsRestOperation op = mock(AbfsRestOperation.class);
+ final Configuration conf = new Configuration();
+ conf.set(accountKey1, accountValue1);
+ AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName1);
+ AbfsPerfTracker tracker = new AbfsPerfTracker("test", accountName1,
abfsConf);
+ when(client.getAbfsPerfTracker()).thenReturn(tracker);
+ when(client.getAbfsConfiguration()).thenReturn(abfsConf);
+ when(client.getAbfsCounters()).thenReturn(abfsCounters);
+ when(client.append(anyString(), any(byte[].class),
+ any(AppendRequestParameters.class), any(), any(),
+ any(TracingContext.class))).thenReturn(op);
+ when(client.flush(anyString(), anyLong(), anyBoolean(), anyBoolean(),
any(),
+ isNull(), any(), any(TracingContext.class),
anyString())).thenReturn(op);
+ when(clientHandler.getClient(any())).thenReturn(client);
+ when(clientHandler.getDfsClient()).thenReturn(client);
+
+ AbfsOutputStream out = Mockito.spy(new AbfsOutputStream(
+ populateAbfsOutputStreamContext(
+ BUFFER_SIZE, true, false, false, true, clientHandler, PATH,
+ new TracingContext(abfsConf.getClientCorrelationId(), "test-fs-id",
+ FSOperationType.WRITE, abfsConf.getTracingHeaderFormat(),
null),
+ createExecutorService(abfsConf), true)));
+ when(out.getClient()).thenReturn(client);
+ when(out.getMd5()).thenReturn(null);
+
+ out.write(new byte[WRITE_SIZE]);
+ out.hflush();
+ out.close();
+
+ AppendRequestParameters reqParameters = new AppendRequestParameters(
+ 0, 0, WRITE_SIZE, FLUSH_MODE, false, null, true, null);
+ verify(client, times(1)).append(eq(PATH), any(byte[].class),
+ refEq(reqParameters), any(), any(), any(TracingContext.class));
+ verify(client, times(1)).append(eq(PATH), any(byte[].class), any(),
+ any(), any(), any(TracingContext.class));
+ }
+
/**
* The test verifies OutputStream shortwrite case(2000bytes write followed
by flush, hflush, hsync) is making correct HTTP calls to the server
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]