This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new fc54ffd0c3e [FLINK-33981][runtime] Fix not closing DirectoryStream
after listing local state files
fc54ffd0c3e is described below
commit fc54ffd0c3e77f7fa01ce04c41291bd80900288f
Author: Feng Jiajie <[email protected]>
AuthorDate: Mon Jan 29 10:39:27 2024 +0800
[FLINK-33981][runtime] Fix not closing DirectoryStream after listing local
state files
---
.../flink/runtime/state/TaskExecutorLocalStateStoresManager.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index b4dcd6af729..d56601ee5d9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -47,6 +47,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* This class holds the all {@link TaskLocalStateStoreImpl} objects for a task
executor (manager).
@@ -296,9 +297,11 @@ public class TaskExecutorLocalStateStoresManager {
@Nonnull
static Collection<Path> listAllocationDirectoriesIn(File
localStateRootDirectory)
throws IOException {
- return Files.list(localStateRootDirectory.toPath())
- .filter(path ->
path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX))
- .collect(Collectors.toList());
+ try (Stream<Path> fileListStream =
Files.list(localStateRootDirectory.toPath())) {
+ return fileListStream
+ .filter(path ->
path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX))
+ .collect(Collectors.toList());
+ }
}
public void shutdown() {