[
https://issues.apache.org/jira/browse/FLINK-30224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17650815#comment-17650815
]
Danny Cranmer commented on FLINK-30224:
---------------------------------------
Merged commit
[{{9ee0fe3}}|https://github.com/apache/flink-connector-aws/commit/9ee0fe32d1a9e6d62e514824ab553956fe88ee9d]
into apache:main
> Add IT Case for NPE in FlinkKinesisConsumer's close() method
> ------------------------------------------------------------
>
> Key: FLINK-30224
> URL: https://issues.apache.org/jira/browse/FLINK-30224
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kinesis
> Reporter: Astamur Kirillin
> Assignee: Astamur Kirillin
> Priority: Minor
> Labels: pull-request-available
> Fix For: aws-connector-4.1.0
>
>
> In our project we observed an issue during too early call of
> {{stop-with-savepoint}} operation when we're calling it after 4-6 seconds
> after a job was started.
> We use Kinesis EFO consumer in our application and it looks like that it
> takes some time for the {{KinesisDataFetcher}} to be initialized and because
> of that {{FlinkKinesisConsumer#run(SourceContext<T>)}} method is slow. In
> some tests we call {{stop-with-savepoint}} after 4-6 seconds after a job was
> started and we have {{NullPointerException}} in
> {{{}FlinkKinesisConsumer#close(){}}}:
> {code:java}
> java.lang.NullPointerException: null
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.close(FlinkKinesisConsumer.java:421)
> ~[classes/:?]
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
> ~[classes/:?]
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
> ~[classes/:?]
> at
> org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:997)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:876)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:759)
> ~[classes/:?]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
> ~[classes/:?]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
> ~[classes/:?]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
> ~[classes/:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
> ~[classes/:?]
> at java.lang.Thread.run(Thread.java:829) ~[?:?] {code}
> We were able to reproduce this issue in {{FlinkKinesisITCase}} even for a
> standard KDS consumer. A fix is just an additional non-null check during
> consumer's close. We'll also add an IT-test for that.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)