Hi devs,

I'll raise a bug ticket once I get approval for a Jira account. I'd be glad
to contribute a fix for the issue. It appears to be the same issue as
FLINK-24431 that was fixed in the FlinkKinesisConsumer connector.

Thank you,
Jonathan Du

On Thu, 29 May 2025 at 15:39, Jonathan Du <j...@atlassian.com> wrote:

> Hi devs,
>
> I've recently upgraded my Flink v1.20 apps using the FlinkKinesisConsumer
> API to the newly recommended KinesisStreamsSource API. I'm now experiencing
> that my Kinesis source operator (and hence, the entire application) can
> stall in the initialising phase when restarting.
>
> I've been able to replicate the issue with the following conditions:
>
>    - Source operator uses KinesisStreamsSource Flink-Kinesis connector
>    (org.apache.flink:flink-connector-kinesis:5.0.0-1.20)
>    - Reader type: EFO
>    - Autoscaling enabled
>    - Trigger: Flink application restart near an earlier restart
>
> Looking into the job manager logs, I observe the following events leading
> up to the failure:
>
>    1. Registering stream consumer
>    2. Starting consumption from stream
>    3. Required number of readers: 2, Registered readers: 0
>    4. Required number of readers: 2, Registered readers: 1
>    5. Can change the parallelism of the job. Restarting the job.
>    6. Job switched from state RUNNING to CANCELLING
>    7. Job switched from state CANCELLING to CANCELED
>    8. De-registering stream consumer
>    9. Running initialization on master for JOB_NAME
>    10. Waiting for stream consumer to be deregistered
>    11. Job switched from state CREATED to RUNNING.
>    12. Registering stream consumer
>    13. Found existing consumer. Proceeding to read from consumer.
>    14. Starting consumption from stream
>    15. Stream consumer has been deregistered
>    16. Required number of readers: 2, Registered readers: 0
>    (indefinitely, operator stalls on initializing phase)
>
> My best guess at a solution to this: The old connector has the option for
> lazy or eager consumer initialisation, and we used the default lazy
> initialisation. I see there's no option for lazy initialisation in the new
> connector, defaulting to an eager approach.
>
> Has anyone seen this issue before? Any advice on how to proceed?
>
> Thank you,
> Jonathan Du
>

Reply via email to