mehakmeet commented on code in PR #6010:
URL: https://github.com/apache/hadoop/pull/6010#discussion_r1319617733
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java:
##########
@@ -345,6 +345,7 @@ private void uploadBlockAsync(DataBlocks.DataBlock
blockToUpload,
return null;
} finally {
IOUtils.close(blockUploadData);
+ blockToUpload.close();
Review Comment:
Add `blockToUpload` in L347 alongside `blockUploadData`, for consistency and
also checks for null.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java:
##########
@@ -90,4 +103,38 @@ public void testTracingForAppend() throws IOException {
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
fs.append(testPath, 10);
}
+
+ @Test
+ public void testCloseOfDataBlockOnAppendComplete() throws Exception {
+ Set<String> blockBufferTypes = new HashSet<>();
+ blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
+ blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
+ blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
+ for (String blockBufferType : blockBufferTypes) {
+ Configuration configuration = new Configuration(getRawConfiguration());
+ configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
+ AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(configuration));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(store).when(fs).getAbfsStore();
+ DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
+ Mockito.doAnswer(getBlobFactoryInvocation -> {
+ DataBlocks.BlockFactory factory = Mockito.spy(
+ (DataBlocks.BlockFactory)
getBlobFactoryInvocation.callRealMethod());
+ Mockito.doAnswer(factoryCreateInvocation -> {
+ dataBlock[0] = Mockito.spy(
+ (DataBlocks.DataBlock)
factoryCreateInvocation.callRealMethod());
+ return dataBlock[0];
+ })
+ .when(factory)
+ .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
+ BlockUploadStatistics.class));
+ return factory;
+ }).when(store).getBlockFactory();
+ OutputStream os = fs.create(new Path("/file_" + blockBufferType));
Review Comment:
try with resources or try-finally to close the stream (Off chance we see
failure after creating, we clean up)
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java:
##########
@@ -90,4 +103,38 @@ public void testTracingForAppend() throws IOException {
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
fs.append(testPath, 10);
}
+
+ @Test
+ public void testCloseOfDataBlockOnAppendComplete() throws Exception {
+ Set<String> blockBufferTypes = new HashSet<>();
+ blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
+ blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
+ blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
+ for (String blockBufferType : blockBufferTypes) {
+ Configuration configuration = new Configuration(getRawConfiguration());
+ configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
+ AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(configuration));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(store).when(fs).getAbfsStore();
+ DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
+ Mockito.doAnswer(getBlobFactoryInvocation -> {
+ DataBlocks.BlockFactory factory = Mockito.spy(
+ (DataBlocks.BlockFactory)
getBlobFactoryInvocation.callRealMethod());
+ Mockito.doAnswer(factoryCreateInvocation -> {
+ dataBlock[0] = Mockito.spy(
+ (DataBlocks.DataBlock)
factoryCreateInvocation.callRealMethod());
+ return dataBlock[0];
+ })
+ .when(factory)
+ .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
+ BlockUploadStatistics.class));
+ return factory;
+ }).when(store).getBlockFactory();
+ OutputStream os = fs.create(new Path("/file_" + blockBufferType));
+ os.write(new byte[1]);
+ os.close();
+ Mockito.verify(dataBlock[0], Mockito.times(1)).close();
Review Comment:
Add an assertion here that dataBlock[0] is closed.
*Suggestion, getState() can be used to verify the state of the block is in
`Closed` state (May need to make the getter public). We can even add an
assertion after L135 to assert it's in `Writing` state as well.
##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemAppend.java:
##########
@@ -90,4 +103,38 @@ public void testTracingForAppend() throws IOException {
fs.getFileSystemId(), FSOperationType.APPEND, false, 0));
fs.append(testPath, 10);
}
+
+ @Test
+ public void testCloseOfDataBlockOnAppendComplete() throws Exception {
+ Set<String> blockBufferTypes = new HashSet<>();
+ blockBufferTypes.add(DATA_BLOCKS_BUFFER_DISK);
+ blockBufferTypes.add(DATA_BLOCKS_BYTEBUFFER);
+ blockBufferTypes.add(DATA_BLOCKS_BUFFER_ARRAY);
+ for (String blockBufferType : blockBufferTypes) {
+ Configuration configuration = new Configuration(getRawConfiguration());
+ configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
+ AzureBlobFileSystem fs = Mockito.spy(
+ (AzureBlobFileSystem) FileSystem.newInstance(configuration));
+ AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+ Mockito.doReturn(store).when(fs).getAbfsStore();
+ DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
+ Mockito.doAnswer(getBlobFactoryInvocation -> {
+ DataBlocks.BlockFactory factory = Mockito.spy(
+ (DataBlocks.BlockFactory)
getBlobFactoryInvocation.callRealMethod());
+ Mockito.doAnswer(factoryCreateInvocation -> {
+ dataBlock[0] = Mockito.spy(
+ (DataBlocks.DataBlock)
factoryCreateInvocation.callRealMethod());
+ return dataBlock[0];
+ })
+ .when(factory)
+ .create(Mockito.anyLong(), Mockito.anyInt(), Mockito.any(
+ BlockUploadStatistics.class));
+ return factory;
+ }).when(store).getBlockFactory();
+ OutputStream os = fs.create(new Path("/file_" + blockBufferType));
Review Comment:
Use "getMethodName()" instead of hardcoding `"/file"`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]