This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 804eb8d [FLINK-25429][state] Avoid to close output streams twice
during uploading changelogs
804eb8d is described below
commit 804eb8dda556a2bea35c69a2662f13d1dafb9255
Author: Yun Tang <[email protected]>
AuthorDate: Thu Dec 23 19:32:07 2021 +0800
[FLINK-25429][state] Avoid to close output streams twice during uploading
changelogs
---
.../org/apache/flink/changelog/fs/StateChangeFsUploader.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index c477ffa..551855b 100644
---
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -114,9 +114,11 @@ class StateChangeFsUploader implements StateChangeUploader
{
}
private LocalResult upload(Path path, Collection<UploadTask> tasks) throws
IOException {
- try (FSDataOutputStream fsStream = fileSystem.create(path,
NO_OVERWRITE)) {
+ boolean wrappedStreamClosed = false;
+ FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE);
+ try {
fsStream.write(compression ? 1 : 0);
- try (OutputStreamWithPos stream = wrap(fsStream); ) {
+ try (OutputStreamWithPos stream = wrap(fsStream)) {
final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets
= new HashMap<>();
for (UploadTask task : tasks) {
tasksOffsets.put(task, format.write(stream,
task.changeSets));
@@ -125,6 +127,12 @@ class StateChangeFsUploader implements StateChangeUploader
{
// WARN: streams have to be closed before returning the results
// otherwise JM may receive invalid handles
return new LocalResult(tasksOffsets, handle);
+ } finally {
+ wrappedStreamClosed = true;
+ }
+ } finally {
+ if (!wrappedStreamClosed) {
+ fsStream.close();
}
}
}