[ https://issues.apache.org/jira/browse/FLINK-21522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17295736#comment-17295736 ]
Kezhu Wang commented on FLINK-21522: ------------------------------------ Hi for all watchers and subscribers, the reported case is not related to iterative stream in data stream but in StatefulFunction, please check FLINK-18894. I have pushed a [branch|https://github.com/kezhuw/flink/commits/FLINK-21522-iterative-stream-and-stop-with-savepoint] for iterative stream in data stream. That branch contains two commit for now: * [https://github.com/kezhuw/flink/commit/d01ef7088fb6fba5adaf8eebbeddacc2c894595c] A test case for stop-with-savepoint and iterative stream. * [https://github.com/kezhuw/flink/commit/c9b8c94394c0d14ea43043d0af30d4e71d4fc846] A possible fix base on current stop-with-savepoint mechanism(eg. {{EndOfPartitionEvent}}) and discussion on FLINK-21133(eg. source are muted after savepoint triggered). > Iterative stream could not work with stop-with-savepoint > -------------------------------------------------------- > > Key: FLINK-21522 > URL: https://issues.apache.org/jira/browse/FLINK-21522 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.11.3, 1.12.1, 1.13.0 > Reporter: Kezhu Wang > Priority: Major > > User reports this in user mail list: > [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Stateful-functions-2-2-and-stop-with-savepoint-td41772.html] > I copied the full mail body here: > {quote} > I have an embedded function with a SinkFunction as an egress, implemented as > this pseudo-code: > val serializationSchema = KafkaSchemaSerializationSchema(... props required > to use a Confluent Schema Registry with Avro, auth etc ...) > return SinkFunctionSpec(EGRESS_ID, FlinkKafkaProducer(serializationSchema, > props, AT_LEAST_ONCE)) > Checkpointing and taking a savepoint without stopping work as expected. > However, when I run "flink stop <job-id>" or even "flink stop --drain > <job-id>", the operation never completes, reporting IN_PROGRESS until I hit > the "failure-cause: org.apache.flink.runtime.checkpoint.CheckpointException: > Checkpoint expired before completing" CompletedException. > In the "Checkpoint History" it shows only 2 of my 3 operators completed their > work: > Source: my-ingress-ingress -> router (my-ingress) | acknowledge: 1/1 (100%) | > end-to-end duration: 638ms | data-size 1.38 KB > feedback-union -> functions -> Sink: my-egress-egress | acknowledge 0/1 0% | > end-to-end duration: n/a | data-size: n/a > feedback | acknowledge: 1/1 (100%) | end-to-end duration: 626ms | data-size: > 0 B > I've been unable to gain any insights from logs so far. Thoughts? > {quote} > I think it is what we overlooked in evaluation of > [apache/iceberg#2033|https://github.com/apache/iceberg/issues/2033], > FLINK-21132 and FLINK-21133. > I think the problem is two folds in current implementation: > # {{StreamIterationHead}} does not finish itself. > # There is a local feedback from {{StreamIterationTail}} to > {{StreamIterationHead}} which could cause {{StreamIterationTail}} blocking > after {{StreamIterationHead}} finished . Globally speaking, it is a loop. > [~pnowojski] emphasized this in FLINK-21132 and FLINK-21133. -- This message was sent by Atlassian Jira (v8.3.4#803005)