This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new fc54ffd0c3e [FLINK-33981][runtime] Fix not closing DirectoryStream 
after listing local state files
fc54ffd0c3e is described below

commit fc54ffd0c3e77f7fa01ce04c41291bd80900288f
Author: Feng Jiajie <[email protected]>
AuthorDate: Mon Jan 29 10:39:27 2024 +0800

    [FLINK-33981][runtime] Fix not closing DirectoryStream after listing local 
state files
---
 .../flink/runtime/state/TaskExecutorLocalStateStoresManager.java | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index b4dcd6af729..d56601ee5d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * This class holds the all {@link TaskLocalStateStoreImpl} objects for a task 
executor (manager).
@@ -296,9 +297,11 @@ public class TaskExecutorLocalStateStoresManager {
     @Nonnull
     static Collection<Path> listAllocationDirectoriesIn(File 
localStateRootDirectory)
             throws IOException {
-        return Files.list(localStateRootDirectory.toPath())
-                .filter(path -> 
path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX))
-                .collect(Collectors.toList());
+        try (Stream<Path> fileListStream = 
Files.list(localStateRootDirectory.toPath())) {
+            return fileListStream
+                    .filter(path -> 
path.getFileName().toString().startsWith(ALLOCATION_DIR_PREFIX))
+                    .collect(Collectors.toList());
+        }
     }
 
     public void shutdown() {

Reply via email to