Roman Khachatryan created FLINK-35769:
-----------------------------------------
Summary: State files might not be deleted on task cancellation
Key: FLINK-35769
URL: https://issues.apache.org/jira/browse/FLINK-35769
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Affects Versions: 1.19.1
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan
Fix For: 1.20.0
We have a job in an infinite (fast) restart loop, that’s crashing with a
serialization issue.
The issue here is that each restart seems to leak state files (not cleaning up
ones from the previous run):
{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
7990}}
{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
689}}
Eventually TM will use too much disk space.
The problem is in
[https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
{code:java}
try {
List<CompletableFuture<Void>> futures =
transferAllStateDataToDirectoryAsync(downloadRequests,
internalCloser)
.collect(Collectors.toList());
// Wait until either all futures completed successfully or one
failed exceptionally.
FutureUtils.completeAll(futures).get();
} catch (Exception e) {
downloadRequests.stream()
.map(StateHandleDownloadSpec::getDownloadDestination)
.map(Path::toFile)
.forEach(FileUtils::deleteDirectoryQuietly); {code}
Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete them.
But if {{completeAll}} is interrupted, then download runnable might re-create
it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)