pnowojski commented on a change in pull request #16271:
URL: https://github.com/apache/flink/pull/16271#discussion_r658706028
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
##########
@@ -335,7 +335,11 @@ public void open() throws Exception {}
* @throws Exception An exception in this method causes the operator to
fail.
*/
@Override
- public void close() throws Exception {}
+ public void close() throws Exception {
+ if (output != null) {
+ output.close();
+ }
+ }
Review comment:
I think the `OperatorChain` should be the owner of the outputs. He is
referencing some of the created outputs and using them for other things:
- registering some metrics (maybe can be safely ignored?)
- `OperatorChain#chainedSources` are also referencing some outputs
Hence I would suggest that `OperatorChain#releaseOutputs()` should be the
guy to close all outputs that were created inside that `OperatorChain`.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ChainingOutput.java
##########
@@ -129,13 +126,7 @@ public void emitLatencyMarker(LatencyMarker latencyMarker)
{
@Override
public void close() {
- try {
- if (closeable != null) {
- closeable.close();
- }
- } catch (Exception e) {
- throw new ExceptionInChainedOperatorException(e);
- }
Review comment:
unit test?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
##########
@@ -124,7 +124,7 @@ public
AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int nu
operatorMetricGroup = null;
Review comment:
```
operatorMetricGroup = null;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]