This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e77d3a226ba [FLINK-28602][state/changelog] Close stream of
StateChangeFsUploader normally while enabling compression
e77d3a226ba is described below
commit e77d3a226bab6d06272976a742c225811dc3ca36
Author: Hangxiang Yu <[email protected]>
AuthorDate: Tue Jul 19 15:03:26 2022 +0800
[FLINK-28602][state/changelog] Close stream of StateChangeFsUploader
normally while enabling compression
---
.../flink/changelog/fs/StateChangeFsUploader.java | 16 +----
.../changelog/fs/StateChangeFsUploaderTest.java | 77 ++++++++++++++++++++--
2 files changed, 74 insertions(+), 19 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index e60c28f6976..8a37a07f460 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -145,9 +144,7 @@ public class StateChangeFsUploader implements
StateChangeUploader {
}
private UploadTasksResult upload(Path path, Collection<UploadTask> tasks)
throws IOException {
- boolean wrappedStreamClosed = false;
- FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
- try {
+ try (FSDataOutputStream fsStream = fileSystem.create(path,
NO_OVERWRITE)) {
fsStream.write(compression ? 1 : 0);
try (OutputStreamWithPos stream = wrap(fsStream)) {
final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets
= new HashMap<>();
@@ -164,12 +161,6 @@ public class StateChangeFsUploader implements
StateChangeUploader {
// WARN: streams have to be closed before returning the results
// otherwise JM may receive invalid handles
return new UploadTasksResult(tasksOffsets, handle);
- } finally {
- wrappedStreamClosed = true;
- }
- } finally {
- if (!wrappedStreamClosed) {
- fsStream.close();
}
}
}
@@ -179,9 +170,8 @@ public class StateChangeFsUploader implements
StateChangeUploader {
compression
? SnappyStreamCompressionDecorator.INSTANCE
: UncompressedStreamCompressionDecorator.INSTANCE;
- OutputStream compressed =
- compression ? instance.decorateWithCompression(fsStream) :
fsStream;
- return new OutputStreamWithPos(new BufferedOutputStream(compressed,
bufferSize));
+ return new OutputStreamWithPos(
+ new
BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize));
}
private String generateFileName() {
diff --git
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
index d690f951635..abdf873fb3e 100644
---
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
+++
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/StateChangeFsUploaderTest.java
@@ -19,18 +19,30 @@
package org.apache.flink.changelog.fs;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalDataOutputStream;
+import org.apache.flink.core.fs.local.LocalFileSystem;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.File;
import java.io.IOException;
+import java.nio.file.FileAlreadyExistsException;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup;
-import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
/** {@link StateChangeFsUploader} test. */
class StateChangeFsUploaderTest {
+ @TempDir java.nio.file.Path tempFolder;
+
@Test
void testBasePath() throws IOException {
JobID jobID = JobID.generate();
@@ -50,10 +62,63 @@ class StateChangeFsUploaderTest {
metrics,
TaskChangelogRegistry.NO_OP);
- assertEquals(
- uploader.getBasePath().getPath(),
- String.format(
- "%s/%s/%s",
- rootPath, jobID.toHexString(),
StateChangeFsUploader.PATH_SUB_DIR));
+ assertThat(uploader.getBasePath().getPath())
+ .isEqualTo(
+ String.format(
+ "%s/%s/%s",
+ rootPath, jobID.toHexString(),
StateChangeFsUploader.PATH_SUB_DIR));
+ }
+
+ @Test
+ void testCompression() throws IOException {
+ AtomicBoolean outputStreamClosed = new AtomicBoolean(false);
+
+ FileSystem wrappedFileSystem =
+ new LocalFileSystem() {
+ @Override
+ public FSDataOutputStream create(Path filePath, WriteMode
overwrite)
+ throws IOException {
+ checkNotNull(filePath, "filePath");
+
+ if (exists(filePath) && overwrite ==
WriteMode.NO_OVERWRITE) {
+ throw new FileAlreadyExistsException(
+ "File already exists: " + filePath);
+ }
+
+ final Path parent = filePath.getParent();
+ if (parent != null && !mkdirs(parent)) {
+ throw new IOException("Mkdirs failed to create " +
parent);
+ }
+
+ final File file = pathToFile(filePath);
+ return new LocalDataOutputStream(file) {
+ @Override
+ public void close() throws IOException {
+ super.close();
+ outputStreamClosed.set(true);
+ }
+ };
+ }
+ };
+
+ StateChangeFsUploader uploader = createUploader(wrappedFileSystem,
false);
+ uploader.upload(Collections.emptyList());
+ assertThat(outputStreamClosed.get()).isTrue();
+
+ outputStreamClosed.set(false);
+ uploader = createUploader(wrappedFileSystem, true);
+ uploader.upload(Collections.emptyList());
+ assertThat(outputStreamClosed.get()).isTrue();
+ }
+
+ private StateChangeFsUploader createUploader(FileSystem fileSystem,
boolean compression) {
+ return new StateChangeFsUploader(
+ JobID.generate(),
+ new Path(tempFolder.toUri()),
+ fileSystem,
+ compression,
+ 4096,
+ new
ChangelogStorageMetricGroup(createUnregisteredTaskManagerJobMetricGroup()),
+ TaskChangelogRegistry.NO_OP);
}
}