1996fanrui commented on code in PR #21855:
URL: https://github.com/apache/flink/pull/21855#discussion_r1097010183


##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java:
##########
@@ -96,13 +100,92 @@ public List<StreamStateHandle> duplicate(
                     filePaths,
                     checkpointStreamFactory,
                     CheckpointedStateScope.SHARED,
+                    new CloseableRegistry(),
                     new CloseableRegistry());
             fail();
         } catch (Exception e) {
             assertEquals(expectedException, e);
         }
     }
 
+    @Test
+    public void testUploadedSstCanBeCleanedUp() throws Exception {
+        SpecifiedException expectedException =
+                new SpecifiedException("throw exception while multi thread 
upload states.");
+
+        File checkpointPrivateFolder = temporaryFolder.newFolder("private");
+        org.apache.flink.core.fs.Path checkpointPrivateDirectory =
+                
org.apache.flink.core.fs.Path.fromLocalFile(checkpointPrivateFolder);
+
+        File checkpointSharedFolder = temporaryFolder.newFolder("shared");
+        org.apache.flink.core.fs.Path checkpointSharedDirectory =
+                
org.apache.flink.core.fs.Path.fromLocalFile(checkpointSharedFolder);
+
+        FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem();
+
+        int sstFileCount = 6;
+        int fileStateSizeThreshold = 1024;
+        int writeBufferSize = 4096;
+        CheckpointStreamFactory checkpointStreamFactory =
+                new FsCheckpointStreamFactory(
+                        fileSystem,
+                        checkpointPrivateDirectory,
+                        checkpointSharedDirectory,
+                        fileStateSizeThreshold,
+                        writeBufferSize);
+
+        String localFolder = "local";
+        temporaryFolder.newFolder(localFolder);
+
+        Map<StateHandleID, Path> filePaths =
+                generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold);
+        CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();
+        try (RocksDBStateUploader rocksDBStateUploader = new 
RocksDBStateUploader(sstFileCount)) {
+            rocksDBStateUploader.uploadFilesToCheckpointFs(
+                    filePaths,
+                    checkpointStreamFactory,
+                    CheckpointedStateScope.SHARED,
+                    new CloseableRegistry(),
+                    tmpResourcesRegistry);
+
+            try {
+                rocksDBStateUploader.uploadFilesToCheckpointFs(
+                        filePaths,
+                        new LastFailingCheckpointStateOutputStreamFactory(
+                                checkpointStreamFactory, sstFileCount, 
expectedException),
+                        CheckpointedStateScope.SHARED,
+                        new CloseableRegistry(),
+                        tmpResourcesRegistry);
+                fail();
+            } catch (Exception e) {
+                assertEquals(expectedException, e);
+            }
+            assertEquals(0, 
checkNotNull(checkpointPrivateFolder.list()).length);
+            assertTrue(checkNotNull(checkpointSharedFolder.list()).length > 0);
+
+            tmpResourcesRegistry.close();
+            // Check whether the temporary file before the exception can be 
cleaned up
+            assertEquals(0, 
checkNotNull(checkpointPrivateFolder.list()).length);
+            assertEquals(0, 
checkNotNull(checkpointSharedFolder.list()).length);

Review Comment:
   Hi @curcur @Zakelly @fredia masters,
   
   Sorry, I found the unit test has a bug about thread-safety, and cause unit 
test may fail[1].
   
   Root cause: `RocksDBStateUploader#uploadLocalFileToCheckpointFs` will create 
the file, and clean file if the `tmpResourcesRegistry` is closed. 
   
   However, if the order is like this:
   
   1. Normal stream calls the `result = outputStream.closeAndGetHandle()` to 
create file.
   2. Exception stream fails, and the unit test will call the 
`tmpResourcesRegistry.close();`
   3. Unit test check the `assertEquals(0, 
checkNotNull(checkpointSharedFolder.list()).length);`
   4. Normal stream calls the `tmpResourcesRegistry.registerCloseable(() -> 
StateUtil.discardStateObjectQuietly(result));` to clean the file.
   
   The check of step3 will fail due to cleaning the normal stream file after 
the check.
   
   The number of threads of RocksDBStateUploader is set to 1 to avoid thread 
safety bugs. What do you think?
   
   And could I use the FLINK-30461 to fix the bug? Or should I create a new 
JIRA to fix the unit test bug due to FLINK-30461 has been merged into master 
branch?
   
   <img width="1035" alt="image" 
src="https://user-images.githubusercontent.com/38427477/216901271-50f740b7-e5d0-4b2e-a8a1-7aeb54b52b19.png";>
   
   
   [1] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=45749&view=logs&j=a57e0635-3fad-5b08-57c7-a4142d7d6fa9&t=2ef0effc-1da1-50e5-c2bd-aab434b1c5b7



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to