Hi devs, I'll raise a bug ticket once I get approval for a Jira account. I'd be glad to contribute a fix for the issue. It appears to be the same issue as FLINK-24431 that was fixed in the FlinkKinesisConsumer connector.
Thank you, Jonathan Du On Thu, 29 May 2025 at 15:39, Jonathan Du <j...@atlassian.com> wrote: > Hi devs, > > I've recently upgraded my Flink v1.20 apps using the FlinkKinesisConsumer > API to the newly recommended KinesisStreamsSource API. I'm now experiencing > that my Kinesis source operator (and hence, the entire application) can > stall in the initialising phase when restarting. > > I've been able to replicate the issue with the following conditions: > > - Source operator uses KinesisStreamsSource Flink-Kinesis connector > (org.apache.flink:flink-connector-kinesis:5.0.0-1.20) > - Reader type: EFO > - Autoscaling enabled > - Trigger: Flink application restart near an earlier restart > > Looking into the job manager logs, I observe the following events leading > up to the failure: > > 1. Registering stream consumer > 2. Starting consumption from stream > 3. Required number of readers: 2, Registered readers: 0 > 4. Required number of readers: 2, Registered readers: 1 > 5. Can change the parallelism of the job. Restarting the job. > 6. Job switched from state RUNNING to CANCELLING > 7. Job switched from state CANCELLING to CANCELED > 8. De-registering stream consumer > 9. Running initialization on master for JOB_NAME > 10. Waiting for stream consumer to be deregistered > 11. Job switched from state CREATED to RUNNING. > 12. Registering stream consumer > 13. Found existing consumer. Proceeding to read from consumer. > 14. Starting consumption from stream > 15. Stream consumer has been deregistered > 16. Required number of readers: 2, Registered readers: 0 > (indefinitely, operator stalls on initializing phase) > > My best guess at a solution to this: The old connector has the option for > lazy or eager consumer initialisation, and we used the default lazy > initialisation. I see there's no option for lazy initialisation in the new > connector, defaulting to an eager approach. > > Has anyone seen this issue before? Any advice on how to proceed? > > Thank you, > Jonathan Du >