This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 804eb8d  [FLINK-25429][state] Avoid to close output streams twice 
during uploading changelogs
804eb8d is described below

commit 804eb8dda556a2bea35c69a2662f13d1dafb9255
Author: Yun Tang <[email protected]>
AuthorDate: Thu Dec 23 19:32:07 2021 +0800

    [FLINK-25429][state] Avoid to close output streams twice during uploading 
changelogs
---
 .../org/apache/flink/changelog/fs/StateChangeFsUploader.java | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index c477ffa..551855b 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -114,9 +114,11 @@ class StateChangeFsUploader implements StateChangeUploader 
{
     }
 
     private LocalResult upload(Path path, Collection<UploadTask> tasks) throws 
IOException {
-        try (FSDataOutputStream fsStream = fileSystem.create(path, 
NO_OVERWRITE)) {
+        boolean wrappedStreamClosed = false;
+        FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
+        try {
             fsStream.write(compression ? 1 : 0);
-            try (OutputStreamWithPos stream = wrap(fsStream); ) {
+            try (OutputStreamWithPos stream = wrap(fsStream)) {
                 final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets 
= new HashMap<>();
                 for (UploadTask task : tasks) {
                     tasksOffsets.put(task, format.write(stream, 
task.changeSets));
@@ -125,6 +127,12 @@ class StateChangeFsUploader implements StateChangeUploader 
{
                 // WARN: streams have to be closed before returning the results
                 // otherwise JM may receive invalid handles
                 return new LocalResult(tasksOffsets, handle);
+            } finally {
+                wrappedStreamClosed = true;
+            }
+        } finally {
+            if (!wrappedStreamClosed) {
+                fsStream.close();
             }
         }
     }

Reply via email to