This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fafeb7f9534c684b76db14b5cbd26c44251c8647
Author: Matthias Pohl <[email protected]>
AuthorDate: Thu Mar 31 13:49:24 2022 +0200

    [FLINK-26957][runtime] Removes flush in FileSystemJobResultStore
    
    The writeValue calls close by default internally. Calling flush afterwards
    could cause errors. It's also not really necessary. OutputStream.flush does
    not guarantee persistence according to its JavaDoc. In contrast, calling
    close does guarantee it.
---
 .../flink/runtime/highavailability/FileSystemJobResultStore.java  | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
index 5f05180c05c..010ce77e74c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobResultSerializer;
+import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
 import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -137,8 +138,11 @@ public class FileSystemJobResultStore extends 
AbstractThreadsafeJobResultStore {
     public void createDirtyResultInternal(JobResultEntry jobResultEntry) 
throws IOException {
         final Path path = constructDirtyPath(jobResultEntry.getJobId());
         try (OutputStream os = fileSystem.create(path, 
FileSystem.WriteMode.NO_OVERWRITE)) {
-            mapper.writeValue(os, new JsonJobResultEntry(jobResultEntry));
-            os.flush();
+            mapper.writeValue(
+                    // working around the internally used _writeAndClose 
method to ensure that close
+                    // is only called once
+                    new NonClosingOutputStreamDecorator(os),
+                    new JsonJobResultEntry(jobResultEntry));
         }
     }
 

Reply via email to