[
https://issues.apache.org/jira/browse/FLINK-35769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Khachatryan updated FLINK-35769:
--------------------------------------
Description:
We have a job in an infinite (fast) restart loop, that’s crashing with a
serialization issue.
The issue here is that each restart seems to leak state files (not cleaning up
ones from the previous run):
{code:java}
/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
7990
/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
689{code}
Eventually TM will use too much disk space.
The problem is in
[https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
{code:java}
try {
List<CompletableFuture<Void>> futures =
transferAllStateDataToDirectoryAsync(downloadRequests,
internalCloser)
.collect(Collectors.toList());
// Wait until either all futures completed successfully or one
failed exceptionally.
FutureUtils.completeAll(futures).get();
} catch (Exception e) {
downloadRequests.stream()
.map(StateHandleDownloadSpec::getDownloadDestination)
.map(Path::toFile)
.forEach(FileUtils::deleteDirectoryQuietly); {code}
Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete them.
But if {{completeAll}} is interrupted, then download runnable might re-create
it.
was:
We have a job in an infinite (fast) restart loop, that’s crashing with a
serialization issue.
The issue here is that each restart seems to leak state files (not cleaning up
ones from the previous run):
{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
7990}}
{{/tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
689}}
Eventually TM will use too much disk space.
The problem is in
[https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
{code:java}
try {
List<CompletableFuture<Void>> futures =
transferAllStateDataToDirectoryAsync(downloadRequests,
internalCloser)
.collect(Collectors.toList());
// Wait until either all futures completed successfully or one
failed exceptionally.
FutureUtils.completeAll(futures).get();
} catch (Exception e) {
downloadRequests.stream()
.map(StateHandleDownloadSpec::getDownloadDestination)
.map(Path::toFile)
.forEach(FileUtils::deleteDirectoryQuietly); {code}
Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete them.
But if {{completeAll}} is interrupted, then download runnable might re-create
it.
> State files might not be deleted on task cancellation
> -----------------------------------------------------
>
> Key: FLINK-35769
> URL: https://issues.apache.org/jira/browse/FLINK-35769
> Project: Flink
> Issue Type: Bug
> Components: Runtime / State Backends
> Affects Versions: 1.19.1
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Fix For: 1.20.0
>
>
> We have a job in an infinite (fast) restart loop, that’s crashing with a
> serialization issue.
> The issue here is that each restart seems to leak state files (not cleaning
> up ones from the previous run):
> {code:java}
> /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep KeyedProcessOperator | wc -l
> 7990
> /tmp/tm_10.56.9.147:6122-c560c5/tmp $ ls | grep StreamingJoinOperator | wc -l
> 689{code}
> Eventually TM will use too much disk space.
>
> The problem is in
> [https://github.com/apache/flink/blob/64f745a5b1fc14a2cba1ddd977ab8e8db9cf45a4/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L75]
> {code:java}
> try {
> List<CompletableFuture<Void>> futures =
> transferAllStateDataToDirectoryAsync(downloadRequests,
> internalCloser)
> .collect(Collectors.toList());
> // Wait until either all futures completed successfully or one
> failed exceptionally.
> FutureUtils.completeAll(futures).get();
> } catch (Exception e) {
> downloadRequests.stream()
> .map(StateHandleDownloadSpec::getDownloadDestination)
> .map(Path::toFile)
> .forEach(FileUtils::deleteDirectoryQuietly); {code}
> Where {{FileUtils::deleteDirectoryQuietly}} will list the files and delete
> them.
> But if {{completeAll}} is interrupted, then download runnable might re-create
> it.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)