Zakelly commented on code in PR #21547:
URL: https://github.com/apache/flink/pull/21547#discussion_r1092716236
##########
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java:
##########
@@ -67,50 +70,125 @@ public void
testMultiThreadUploadThreadPoolExceptionRethrow() throws IOException
new CheckpointStreamFactory() {
@Override
public CheckpointStateOutputStream
createCheckpointStateOutputStream(
- CheckpointedStateScope scope) throws IOException {
+ CheckpointedStateScope scope) {
return outputStream;
}
@Override
public boolean canFastDuplicate(
- StreamStateHandle stateHandle,
CheckpointedStateScope scope)
- throws IOException {
+ StreamStateHandle stateHandle,
CheckpointedStateScope scope) {
return false;
}
@Override
public List<StreamStateHandle> duplicate(
- List<StreamStateHandle> stateHandles,
CheckpointedStateScope scope)
- throws IOException {
+ List<StreamStateHandle> stateHandles,
CheckpointedStateScope scope) {
return null;
}
};
- File file = temporaryFolder.newFile(String.valueOf(UUID.randomUUID()));
+ File file = TempDirUtils.newFile(temporaryFolder,
String.valueOf(UUID.randomUUID()));
generateRandomFileContent(file.getPath(), 20);
Map<StateHandleID, Path> filePaths = new HashMap<>(1);
filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
try (RocksDBStateUploader rocksDBStateUploader = new
RocksDBStateUploader(5)) {
+ assertThatThrownBy(
+ () ->
+
rocksDBStateUploader.uploadFilesToCheckpointFs(
+ filePaths,
+ checkpointStreamFactory,
+ CheckpointedStateScope.SHARED,
+ new CloseableRegistry(),
+ new CloseableRegistry()))
+ .isEqualTo(expectedException);
+ }
+ }
+
+ @Test
+ void testUploadedSstCanBeCleanedUp() throws Exception {
+ SpecifiedException expectedException =
+ new SpecifiedException("throw exception while multi thread
upload states.");
+
+ File checkpointPrivateFolder = TempDirUtils.newFolder(temporaryFolder,
"private");
+ org.apache.flink.core.fs.Path checkpointPrivateDirectory =
+
org.apache.flink.core.fs.Path.fromLocalFile(checkpointPrivateFolder);
+
+ File checkpointSharedFolder = TempDirUtils.newFolder(temporaryFolder,
"shared");
+ org.apache.flink.core.fs.Path checkpointSharedDirectory =
+
org.apache.flink.core.fs.Path.fromLocalFile(checkpointSharedFolder);
+
+ FileSystem fileSystem = checkpointPrivateDirectory.getFileSystem();
+
+ int sstFileCount = 6;
+ int fileStateSizeThreshold = 1024;
+ int writeBufferSize = 4096;
+ CheckpointStreamFactory checkpointStreamFactory =
+ new FsCheckpointStreamFactory(
+ fileSystem,
+ checkpointPrivateDirectory,
+ checkpointSharedDirectory,
+ fileStateSizeThreshold,
+ writeBufferSize);
+
+ String localFolder = "local";
+ TempDirUtils.newFolder(temporaryFolder, localFolder);
+
+ Map<StateHandleID, Path> filePaths =
+ generateRandomSstFiles(localFolder, sstFileCount,
fileStateSizeThreshold);
+ CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();
+ try (RocksDBStateUploader rocksDBStateUploader = new
RocksDBStateUploader(sstFileCount)) {
rocksDBStateUploader.uploadFilesToCheckpointFs(
filePaths,
checkpointStreamFactory,
CheckpointedStateScope.SHARED,
- new CloseableRegistry());
- fail();
- } catch (Exception e) {
- assertEquals(expectedException, e);
+ new CloseableRegistry(),
+ tmpResourcesRegistry);
+
+ assertThatThrownBy(
+ () ->
+
rocksDBStateUploader.uploadFilesToCheckpointFs(
+ filePaths,
+ new
LastFailingCheckpointStateOutputStreamFactory(
+ checkpointStreamFactory,
+ sstFileCount,
+ expectedException),
+ CheckpointedStateScope.SHARED,
+ new CloseableRegistry(),
+ tmpResourcesRegistry))
+ .isEqualTo(expectedException);
+ tmpResourcesRegistry.close();
Review Comment:
Before this, would you mind checking the private folder or shared folder is
not empty?
##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java:
##########
@@ -147,6 +157,11 @@ private StreamStateHandle uploadLocalFileToCheckpointFs(
result = outputStream.closeAndGetHandle();
outputStream = null;
}
+ if (result != null) {
+ StreamStateHandle finalResult = result;
Review Comment:
How about adding a keyword 'final'? Or just let 'result' be final, i.e.:
```
final StreamStateHandle result;
if (closeableRegistry.unregisterCloseable(outputStream)) {
result = outputStream.closeAndGetHandle();
outputStream = null;
} else {
result = null;
}
// following other code...
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]