Zakelly commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1092716236


##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java:
##########
@@ -67,50 +70,125 @@ public void 
testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException
                 new CheckpointStreamFactory() {
                     @Override
                     public CheckpointStateOutputStream 
createCheckpointStateOutputStream(
-                            CheckpointedStateScope scope) throws IOException {
+                            CheckpointedStateScope scope) {
                         return outputStream;
                     }
 
                     @Override
                     public boolean canFastDuplicate(
-                            StreamStateHandle stateHandle, 
CheckpointedStateScope scope)
-                            throws IOException {
+                            StreamStateHandle stateHandle, 
CheckpointedStateScope scope) {
                         return false;
                     }
 
                     @Override
                     public List<StreamStateHandle> duplicate(
-                            List<StreamStateHandle> stateHandles, 
CheckpointedStateScope scope)
-                            throws IOException {
+                            List<StreamStateHandle> stateHandles, 
CheckpointedStateScope scope) {
                         return null;
                     }
                 };
 
-        File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+        File file = TempDirUtils.newFile(temporaryFolder, 
String.valueOf(UUID.randomUUID()));
         generateRandomFileContent(file.getPath(), 20);
 
         Map<StateHandleID, Path> filePaths = new HashMap<>(1);
         filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
         try (RocksDBStateUploader rocksDBStateUploader = new 
RocksDBStateUploader(5)) {
+            assertThatThrownBy(
+                            () ->
+                                    
rocksDBStateUploader.uploadFilesToCheckpointFs(
+                                            filePaths,
+                                            checkpointStreamFactory,
+                                            CheckpointedStateScope.SHARED,
+                                            new CloseableRegistry(),
+                                            new CloseableRegistry()))
+                    .isEqualTo(expectedException);
+        }
+    }
+
+    @Test
+    void testUploadedSstCanBeCleanedUp() throws Exception {
+        SpecifiedException expectedException =
+                new SpecifiedException("throw exception while multi thread 
upload states.");
+
+        File checkpointPrivateFolder = TempDirUtils.newFolder(temporaryFolder, 
"private");
+        org.apache.flink.core.fs.Path checkpointPrivateDirectory =
+                
org.apache.flink.core.fs.Path.fromLocalFile(checkpointPrivateFolder);
+
+        File checkpointSharedFolder = TempDirUtils.newFolder(temporaryFolder, 
"shared");
+        org.apache.flink.core.fs.Path checkpointSharedDirectory =
+                
org.apache.flink.core.fs.Path.fromLocalFile(checkpointSharedFolder);
+
+        FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem();
+
+        int sstFileCount = 6;
+        int fileStateSizeThreshold = 1024;
+        int writeBufferSize = 4096;
+        CheckpointStreamFactory checkpointStreamFactory =
+                new FsCheckpointStreamFactory(
+                        fileSystem,
+                        checkpointPrivateDirectory,
+                        checkpointSharedDirectory,
+                        fileStateSizeThreshold,
+                        writeBufferSize);
+
+        String localFolder = "local";
+        TempDirUtils.newFolder(temporaryFolder, localFolder);
+
+        Map<StateHandleID, Path> filePaths =
+                generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold);
+        CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();
+        try (RocksDBStateUploader rocksDBStateUploader = new 
RocksDBStateUploader(sstFileCount)) {
             rocksDBStateUploader.uploadFilesToCheckpointFs(
                     filePaths,
                     checkpointStreamFactory,
                     CheckpointedStateScope.SHARED,
-                    new CloseableRegistry());
-            fail();
-        } catch (Exception e) {
-            assertEquals(expectedException, e);
+                    new CloseableRegistry(),
+                    tmpResourcesRegistry);
+
+            assertThatThrownBy(
+                            () ->
+                                    
rocksDBStateUploader.uploadFilesToCheckpointFs(
+                                            filePaths,
+                                            new 
LastFailingCheckpointStateOutputStreamFactory(
+                                                    checkpointStreamFactory,
+                                                    sstFileCount,
+                                                    expectedException),
+                                            CheckpointedStateScope.SHARED,
+                                            new CloseableRegistry(),
+                                            tmpResourcesRegistry))
+                    .isEqualTo(expectedException);
+            tmpResourcesRegistry.close();

Review Comment:
   Before this, would you mind checking the private folder or shared folder is 
not empty?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -147,6 +157,11 @@ private StreamStateHandle uploadLocalFileToCheckpointFs(
                 result = outputStream.closeAndGetHandle();
                 outputStream = null;
             }
+            if (result != null) {
+                StreamStateHandle finalResult = result;

Review Comment:
   How about adding a keyword 'final'? Or just let 'result' be final, i.e.:
   ```
   final StreamStateHandle result;
   if (closeableRegistry.unregisterCloseable(outputStream)) {
       result = outputStream.closeAndGetHandle();
       outputStream = null;
   } else {
       result = null;
   }
   // following other code...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to