Vararu Vadim created FLINK-29015:
------------------------------------
Summary: Flink stop with/without savepoint does not work with
kinesis consumer
Key: FLINK-29015
URL: https://issues.apache.org/jira/browse/FLINK-29015
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis
Affects Versions: 1.15.1
Environment: Flink 1.15.1. Same version for flink-connector-kinesis
1.15.1
Reporter: Vararu Vadim
Trying to migrate the job from Flink 1.14 to 1.15.1, observed that stop
with/without savepoint does not work anymore.
The checkpointing/savepointing by themselves are working OK.
The exception when stopping the job seems to come from the kinesis connector.
{code:java}
2022-08-17 12:57:16,266 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 2
...
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) ~[?:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.adjustRunLoopFrequency(PollingRecordPublisher.java:216)
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-c2e9ace4f09a95dc67692581fea51a5b:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:124)
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-c2e9ace4f09a95dc67692581fea51a5b:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.publisher.polling.PollingRecordPublisher.run(PollingRecordPublisher.java:102)
~[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-c2e9ace4f09a95dc67692581fea51a5b:?]
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
[blob_p-956a630701b5709969a8029f6fefe9d3cf05a778-c2e9ace4f09a95dc67692581fea51a5b:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)