This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e77d3a226ba [FLINK-28602][state/changelog] Close stream of 
StateChangeFsUploader normally while enabling compression
e77d3a226ba is described below

commit e77d3a226bab6d06272976a742c225811dc3ca36
Author: Hangxiang Yu <[email protected]>
AuthorDate: Tue Jul 19 15:03:26 2022 +0800

    [FLINK-28602][state/changelog] Close stream of StateChangeFsUploader 
normally while enabling compression
---
 .../flink/changelog/fs/StateChangeFsUploader.java  | 16 +----
 .../changelog/fs/StateChangeFsUploaderTest.java    | 77 ++++++++++++++++++++--
 2 files changed, 74 insertions(+), 19 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index e60c28f6976..8a37a07f460 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -145,9 +144,7 @@ public class StateChangeFsUploader implements 
StateChangeUploader {
     }
 
     private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) 
throws IOException {
-        boolean wrappedStreamClosed = false;
-        FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
-        try {
+        try (FSDataOutputStream fsStream = fileSystem.create(path, 
NO_OVERWRITE)) {
             fsStream.write(compression ? 1 : 0);
             try (OutputStreamWithPos stream = wrap(fsStream)) {
                 final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets 
= new HashMap<>();
@@ -164,12 +161,6 @@ public class StateChangeFsUploader implements 
StateChangeUploader {
                 // WARN: streams have to be closed before returning the results
                 // otherwise JM may receive invalid handles
                 return new UploadTasksResult(tasksOffsets, handle);
-            } finally {
-                wrappedStreamClosed = true;
-            }
-        } finally {
-            if (!wrappedStreamClosed) {
-                fsStream.close();
             }
         }
     }
@@ -179,9 +170,8 @@ public class StateChangeFsUploader implements 
StateChangeUploader {
                 compression
                         ? SnappyStreamCompressionDecorator.INSTANCE
                         : UncompressedStreamCompressionDecorator.INSTANCE;
-        OutputStream compressed =
-                compression ? instance.decorateWithCompression(fsStream) : 
fsStream;
-        return new OutputStreamWithPos(new BufferedOutputStream(compressed, 
bufferSize));
+        return new OutputStreamWithPos(
+                new 
BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize));
     }
 
     private String generateFileName() {
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
index d690f951635..abdf873fb3e 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
@@ -19,18 +19,30 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** {@link StateChangeFsUploader} test. */
 class StateChangeFsUploaderTest {
 
+    @TempDir java.nio.file.Path tempFolder;
+
     @Test
     void testBasePath() throws IOException {
         JobID jobID = JobID.generate();
@@ -50,10 +62,63 @@ class StateChangeFsUploaderTest {
                         metrics,
                         TaskChangelogRegistry.NO_OP);
 
-        assertEquals(
-                uploader.getBasePath().getPath(),
-                String.format(
-                        "%s/%s/%s",
-                        rootPath, jobID.toHexString(), 
StateChangeFsUploader.PATH_SUB_DIR));
+        assertThat(uploader.getBasePath().getPath())
+                .isEqualTo(
+                        String.format(
+                                "%s/%s/%s",
+                                rootPath, jobID.toHexString(), 
StateChangeFsUploader.PATH_SUB_DIR));
+    }
+
+    @Test
+    void testCompression() throws IOException {
+        AtomicBoolean outputStreamClosed = new AtomicBoolean(false);
+
+        FileSystem wrappedFileSystem =
+                new LocalFileSystem() {
+                    @Override
+                    public FSDataOutputStream create(Path filePath, WriteMode 
overwrite)
+                            throws IOException {
+                        checkNotNull(filePath, "filePath");
+
+                        if (exists(filePath) && overwrite == 
WriteMode.NO_OVERWRITE) {
+                            throw new FileAlreadyExistsException(
+                                    "File already exists: " + filePath);
+                        }
+
+                        final Path parent = filePath.getParent();
+                        if (parent != null && !mkdirs(parent)) {
+                            throw new IOException("Mkdirs failed to create " + 
parent);
+                        }
+
+                        final File file = pathToFile(filePath);
+                        return new LocalDataOutputStream(file) {
+                            @Override
+                            public void close() throws IOException {
+                                super.close();
+                                outputStreamClosed.set(true);
+                            }
+                        };
+                    }
+                };
+
+        StateChangeFsUploader uploader = createUploader(wrappedFileSystem, 
false);
+        uploader.upload(Collections.emptyList());
+        assertThat(outputStreamClosed.get()).isTrue();
+
+        outputStreamClosed.set(false);
+        uploader = createUploader(wrappedFileSystem, true);
+        uploader.upload(Collections.emptyList());
+        assertThat(outputStreamClosed.get()).isTrue();
+    }
+
+    private StateChangeFsUploader createUploader(FileSystem fileSystem, 
boolean compression) {
+        return new StateChangeFsUploader(
+                JobID.generate(),
+                new Path(tempFolder.toUri()),
+                fileSystem,
+                compression,
+                4096,
+                new 
ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup()),
+                TaskChangelogRegistry.NO_OP);
     }
 }

Reply via email to