This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c50b0706237114adec195b84202b969a148ccece Author: Matthias Pohl <[email protected]> AuthorDate: Thu Mar 31 13:49:24 2022 +0200 [FLINK-26957][runtime] Removes flush in FileSystemJobResultStore The writeValue calls close by default internally. Calling flush afterwards could cause errors. It's also not really necessary. OutputStream.flush does not guarantee persistence according to its JavaDoc. In contrast, calling close does guarantee it. --- .../flink/runtime/highavailability/FileSystemJobResultStore.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java index 5f05180c05c..010ce77e74c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/FileSystemJobResultStore.java @@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.rest.messages.json.JobResultDeserializer; import org.apache.flink.runtime.rest.messages.json.JobResultSerializer; +import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -137,8 +138,11 @@ public class FileSystemJobResultStore extends AbstractThreadsafeJobResultStore { public void createDirtyResultInternal(JobResultEntry jobResultEntry) throws IOException { final Path path = constructDirtyPath(jobResultEntry.getJobId()); try (OutputStream os = fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE)) { - mapper.writeValue(os, new JsonJobResultEntry(jobResultEntry)); - os.flush(); + mapper.writeValue( + // working around the internally used _writeAndClose method to ensure that close + // is only called once + new NonClosingOutputStreamDecorator(os), + new JsonJobResultEntry(jobResultEntry)); } }
