[
https://issues.apache.org/jira/browse/FLINK-35769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Weijie Guo updated FLINK-35769:
-------------------------------
Fix Version/s: 2.0.0
(was: 1.20.0)
> State files might not be deleted on task cancellation
> -----------------------------------------------------
>
> Key: FLINK-35769
> URL: https://issues.apache.org/jira/browse/FLINK-35769
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.19.1
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Fix For: 2.0.0
>
>
> We have a job in an infinite (fast) restart loop, that’s crashing with a
> serialization issue.
> The issue here is that each restart seems to leak state files (not cleaning
> up ones from the previous run):
> {code:java}
> /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
> 7990
> /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
> 689{code}
> Eventually TM will use too much disk space.
>
> The problem is in
> [https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
> {code:java}
> try {
> List<CompletableFuture<Void>> futures =
> transferAllStateDataToDirectoryAsync(downloadRequests,
> internalCloser)
> .collect(Collectors.toList());
> // Wait until either all futures completed successfully or one
> failed exceptionally.
> FutureUtils.completeAll(futures).get();
> } catch (Exception e) {
> downloadRequests.stream()
> .map(StateHandleDownloadSpec::getDownloadDestination)
> .map(Path::toFile)
> .forEach(FileUtils::deleteDirectoryQuietly); {code}
> Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete
> them.
> But if {{completeAll}} is interrupted, then download runnable might re-create
> it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)