This is an automated email from the ASF dual-hosted git repository. mehakmeet pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new a3e132e20c88 HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. (#6105) a3e132e20c88 is described below commit a3e132e20c88f9e658ba25eba08d1302a8a94721 Author: Pranav Saxena <108325433+saxenapra...@users.noreply.github.com> AuthorDate: Mon Sep 25 01:49:32 2023 -0700 HADOOP-18873. ABFS: AbfsOutputStream doesnt close DataBlocks object. (#6105) AbfsOutputStream to close the dataBlock object created for the upload. Contributed By: Pranav Saxena --- .../org/apache/hadoop/fs/store/DataBlocks.java | 4 +- .../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 2 +- .../fs/azurebfs/AzureBlobFileSystemStore.java | 7 ++- .../fs/azurebfs/services/AbfsOutputStream.java | 2 +- .../azurebfs/ITestAzureBlobFileSystemAppend.java | 59 ++++++++++++++++++++++ 5 files changed, 69 insertions(+), 5 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java index a267ce67660f..2c2ee1818113 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/store/DataBlocks.java @@ -329,7 +329,7 @@ public final class DataBlocks { */ public static abstract class DataBlock implements Closeable { - enum DestState {Writing, Upload, Closed} + public enum DestState {Writing, Upload, Closed} private volatile DestState state = Writing; private final long index; @@ -375,7 +375,7 @@ public final class DataBlocks { * * @return the current state. */ - final DestState getState() { + public final DestState getState() { return state; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 426ad8ca1e19..8f7fbc570221 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -330,7 +330,7 @@ public class AzureBlobFileSystem extends FileSystem try { TracingContext tracingContext = new TracingContext(clientCorrelationId, fileSystemId, FSOperationType.CREATE, overwrite, tracingHeaderFormat, listener); - OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite, + OutputStream outputStream = getAbfsStore().createFile(qualifiedPath, statistics, overwrite, permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()), tracingContext); statIncrement(FILES_CREATED); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 2ffd354dc0b9..a4ac1fb3c130 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -707,7 +707,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { .withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount()) .withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue()) .withLease(lease) - .withBlockFactory(blockFactory) + .withBlockFactory(getBlockFactory()) .withBlockOutputActiveBlocks(blockOutputActiveBlocks) .withClient(client) .withPosition(position) @@ -1940,6 +1940,11 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { this.client = client; } + @VisibleForTesting + DataBlocks.BlockFactory getBlockFactory() { + return blockFactory; + } + @VisibleForTesting void setNamespaceEnabled(Trilean isNamespaceEnabled){ this.isNamespaceEnabled = isNamespaceEnabled; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java index 0384b719d26b..f7a33e708d8e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java @@ -344,7 +344,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable, outputStreamStatistics.uploadSuccessful(bytesLength); return null; } finally { - IOUtils.close(blockUploadData); + IOUtils.close(blockUploadData, blockToUpload); } }); writeOperations.add(new WriteOperation(job, offset, bytesLength)); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java index dbe4b42a67df..7d182f936b7b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java @@ -20,15 +20,31 @@ package org.apache.hadoop.fs.azurebfs; import java.io.FileNotFoundException; import java.io.IOException; +import java.io.OutputStream; +import java.util.HashSet; import java.util.Random; +import java.util.Set; +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.mockito.Mockito; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.store.BlockUploadStatistics; +import org.apache.hadoop.fs.store.DataBlocks; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_ARRAY; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BUFFER_DISK; +import static org.apache.hadoop.fs.store.DataBlocks.DATA_BLOCKS_BYTEBUFFER; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Closed; +import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing; /** * Test append operations. @@ -90,4 +106,47 @@ public class ITestAzureBlobFileSystemAppend extends fs.getFileSystemId(), FSOperationType.APPEND, false, 0)); fs.append(testPath, 10); } + + @Test + public void testCloseOfDataBlockOnAppendComplete() throws Exception { + Set<String> blockBufferTypes = new HashSet<>(); + blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK); + blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER); + blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY); + for (String blockBufferType : blockBufferTypes) { + Configuration configuration = new Configuration(getRawConfiguration()); + configuration.set(DATA_BLOCKS_BUFFER, blockBufferType); + AzureBlobFileSystem fs = Mockito.spy( + (AzureBlobFileSystem) FileSystem.newInstance(configuration)); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1]; + Mockito.doAnswer(getBlobFactoryInvocation -> { + DataBlocks.BlockFactory factory = Mockito.spy( + (DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod()); + Mockito.doAnswer(factoryCreateInvocation -> { + dataBlock[0] = Mockito.spy( + (DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod()); + return dataBlock[0]; + }) + .when(factory) + .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any( + BlockUploadStatistics.class)); + return factory; + }).when(store).getBlockFactory(); + try (OutputStream os = fs.create( + new Path(getMethodName() + "_" + blockBufferType))) { + os.write(new byte[1]); + Assertions.assertThat(dataBlock[0].getState()) + .describedAs( + "On write of data in outputStream, state should become Writing") + .isEqualTo(Writing); + os.close(); + Mockito.verify(dataBlock[0], Mockito.times(1)).close(); + Assertions.assertThat(dataBlock[0].getState()) + .describedAs("On close of outputStream, state should become Closed") + .isEqualTo(Closed); + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org