akalash commented on a change in pull request #17187:
URL: https://github.com/apache/flink/pull/17187#discussion_r705262345
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -647,6 +647,8 @@ void restoreInternal() throws Exception {
closedOperators = false;
LOG.debug("Initializing {}.", getName());
+ recordWriter.init();
Review comment:
We somehow should guarantee that all resources will be closed in case of
an error. Before my changes, if the StreamTask fails with the error right after
recordWriter would be created, nobody closes it which leads to the leak. So
semantic should be as following:
```
allocateResources;
try {
// do something
} catch(Throwable ex) {
releaseResources;
}
```
But it is difficult to follow this rule with the current implementation. For
example, look at StreamTask#createRecordWriters, if creating the second record
writer fails we lost the link to the first record writer, so it would be
impossible to close it.
So in my opinion, we have two choices here: to have the init method which
would be invoked under try-catch block(it is exactly what I did). or rewrite a
code in such a way that exception in any constructor(ex. StreamTask) guarantee
releasing the earlier allocated resources in this constructor.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]