This is an automated email from the ASF dual-hosted git repository. dannycranmer pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6080840e9c1f1ded457845a675dbfab4cacad12c Author: Arvid Heise <[email protected]> AuthorDate: Tue Sep 7 20:02:42 2021 +0200 [FLINK-23528][datastream] Let CollectSinkOperator publish results in #close. DataStream#executeAndCollect expects the CollectSinkOperator to register the accumulator at the end of the application or fails with some exception. However, a stop-with-savepoint without drain would not trigger CollectSinkOperator#finish and thus skip the registration. --- .../flink/streaming/api/operators/collect/CollectSinkOperator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java index 8b78bc30fc9..5c84c9b1378 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperator.java @@ -51,9 +51,9 @@ public class CollectSinkOperator<IN> extends StreamSink<IN> implements OperatorE } @Override - public void finish() throws Exception { + public void close() throws Exception { sinkFunction.accumulateFinalResults(); - super.finish(); + super.close(); } public CompletableFuture<OperatorID> getOperatorIdFuture() {
