rkhachatryan commented on code in PR #20306:
URL: https://github.com/apache/flink/pull/20306#discussion_r930864433
##########
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java:
##########
@@ -168,7 +168,7 @@ private UploadTasksResult upload(Path path,
Collection<UploadTask> tasks) throws
wrappedStreamClosed = true;
}
} finally {
- if (!wrappedStreamClosed) {
+ if (!wrappedStreamClosed || compression) {
fsStream.close();
}
Review Comment:
I mean that with [this
change](https://github.com/apache/flink/pull/20306#discussion_r930161714),
neither wrapping, nor additional flag check (`wrappedStreamClosed`) would be
required.
Currently, the composition of streams looks like this:
1. `OutputStreamWithPos` <-- closed by the inner `try` block
1. `BufferedOutputStream`
1. `SnappyFramedOutputStream` or `UncompressedStreamCompressionDecorator`
1. maybe `NonClosingOutputStreamDecorator` <- doesn't proxy `close()`
1. `FSDataOutputStream` <-- closed by the outer `try` block
Where (4) `NonClosingOutputStreamDecorator` is conditional in `wrap()`.
But that condition is unnecessary because `StreamCompressionDecorator`
instance is already choosen in (3).
So if `decorateWithCompression` would **always** add
`NonClosingOutputStreamDecorator` then it should solve the problem.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]