This is an automated email from the ASF dual-hosted git repository.

mehakmeet pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new a3e132e20c88 HADOOP-18873. ABFS: AbfsOutputStream doesnt close 
DataBlocks object. (#6105)
a3e132e20c88 is described below

commit a3e132e20c88f9e658ba25eba08d1302a8a94721
Author: Pranav Saxena <108325433+saxenapra...@users.noreply.github.com>
AuthorDate: Mon Sep 25 01:49:32 2023 -0700

    HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. (#6105)
    
    
    AbfsOutputStream to close the dataBlock object created for the upload.
    
    Contributed By: Pranav Saxena
---
 .../org/apache/hadoop/fs/store/DataBlocks.java     |  4 +-
 .../hadoop/fs/azurebfs/AzureBlobFileSystem.java    |  2 +-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |  7 ++-
 .../fs/azurebfs/services/AbfsOutputStream.java     |  2 +-
 .../azurebfs/ITestAzureBlobFileSystemAppend.java   | 59 ++++++++++++++++++++++
 5 files changed, 69 insertions(+), 5 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
index a267ce67660f..2c2ee1818113 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java
@@ -329,7 +329,7 @@ public final class DataBlocks {
    */
   public static abstract class DataBlock implements Closeable {
 
-    enum DestState {Writing, Upload, Closed}
+    public enum DestState {Writing, Upload, Closed}
 
     private volatile DestState state = Writing;
     private final long index;
@@ -375,7 +375,7 @@ public final class DataBlocks {
      *
      * @return the current state.
      */
-    final DestState getState() {
+    public final DestState getState() {
       return state;
     }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 426ad8ca1e19..8f7fbc570221 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -330,7 +330,7 @@ public class AzureBlobFileSystem extends FileSystem
     try {
       TracingContext tracingContext = new TracingContext(clientCorrelationId,
           fileSystemId, FSOperationType.CREATE, overwrite, 
tracingHeaderFormat, listener);
-      OutputStream outputStream = abfsStore.createFile(qualifiedPath, 
statistics, overwrite,
+      OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, 
statistics, overwrite,
           permission == null ? FsPermission.getFileDefault() : permission,
           FsPermission.getUMask(getConf()), tracingContext);
       statIncrement(FILES_CREATED);
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 2ffd354dc0b9..a4ac1fb3c130 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -707,7 +707,7 @@ public class AzureBlobFileSystemStore implements Closeable, 
ListingSupport {
             
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
             
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
             .withLease(lease)
-            .withBlockFactory(blockFactory)
+            .withBlockFactory(getBlockFactory())
             .withBlockOutputActiveBlocks(blockOutputActiveBlocks)
             .withClient(client)
             .withPosition(position)
@@ -1940,6 +1940,11 @@ public class AzureBlobFileSystemStore implements 
Closeable, ListingSupport {
     this.client = client;
   }
 
+  @VisibleForTesting
+  DataBlocks.BlockFactory getBlockFactory() {
+    return blockFactory;
+  }
+
   @VisibleForTesting
   void setNamespaceEnabled(Trilean isNamespaceEnabled){
     this.isNamespaceEnabled = isNamespaceEnabled;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 0384b719d26b..f7a33e708d8e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -344,7 +344,7 @@ public class AbfsOutputStream extends OutputStream 
implements Syncable,
             outputStreamStatistics.uploadSuccessful(bytesLength);
             return null;
           } finally {
-            IOUtils.close(blockUploadData);
+            IOUtils.close(blockUploadData, blockToUpload);
           }
         });
     writeOperations.add(new WriteOperation(job, offset, bytesLength));
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
index dbe4b42a67df..7d182f936b7b 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java
@@ -20,15 +20,31 @@ package org.apache.hadoop.fs.azurebfs;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashSet;
 import java.util.Random;
+import java.util.Set;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.store.BlockUploadStatistics;
+import org.apache.hadoop.fs.store.DataBlocks;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK;
+import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER;
+import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed;
+import static 
org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
 
 /**
  * Test append operations.
@@ -90,4 +106,47 @@ public class ITestAzureBlobFileSystemAppend extends
         fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
     fs.append(testPath, 10);
   }
+
+  @Test
+  public void testCloseOfDataBlockOnAppendComplete() throws Exception {
+    Set<String> blockBufferTypes = new HashSet<>();
+    blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
+    blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
+    blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
+    for (String blockBufferType : blockBufferTypes) {
+      Configuration configuration = new Configuration(getRawConfiguration());
+      configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
+      AzureBlobFileSystem fs = Mockito.spy(
+          (AzureBlobFileSystem) FileSystem.newInstance(configuration));
+      AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+      Mockito.doReturn(store).when(fs).getAbfsStore();
+      DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
+      Mockito.doAnswer(getBlobFactoryInvocation -> {
+        DataBlocks.BlockFactory factory = Mockito.spy(
+            (DataBlocks.BlockFactory) 
getBlobFactoryInvocation.callRealMethod());
+        Mockito.doAnswer(factoryCreateInvocation -> {
+              dataBlock[0] = Mockito.spy(
+                  (DataBlocks.DataBlock) 
factoryCreateInvocation.callRealMethod());
+              return dataBlock[0];
+            })
+            .when(factory)
+            .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
+                BlockUploadStatistics.class));
+        return factory;
+      }).when(store).getBlockFactory();
+      try (OutputStream os = fs.create(
+          new Path(getMethodName() + "_" + blockBufferType))) {
+        os.write(new byte[1]);
+        Assertions.assertThat(dataBlock[0].getState())
+            .describedAs(
+                "On write of data in outputStream, state should become 
Writing")
+            .isEqualTo(Writing);
+        os.close();
+        Mockito.verify(dataBlock[0], Mockito.times(1)).close();
+        Assertions.assertThat(dataBlock[0].getState())
+            .describedAs("On close of outputStream, state should become 
Closed")
+            .isEqualTo(Closed);
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to