Hoang Tri Tam created FLINK-29827:
-------------------------------------
Summary: [Connector][AsyncSinkWriter] Checkpointed states block
writer from sending records
Key: FLINK-29827
URL: https://issues.apache.org/jira/browse/FLINK-29827
Project: Flink
Issue Type: Bug
Components: Connectors / Common
Affects Versions: 1.15.2
Reporter: Hoang Tri Tam
Hi every one,
Recently we discovered an issue which blocks Sink operators from sending
records to client's endpoint.
To *reproduce* the issue, we started our Flink app from an existing savepoint,
in which some Sink operators hold some buffered records. For instance, app
employs KinesisStreamSink with a parallelism of 4. 2 of them has no buffered
records, the other 2 start with existing states of some records, which are
leftover from the previous run.
{*}Behavior{*}: during runtime, we sent records (let's say 200) to this sink in
rebalance mode. But only 100 of them (50%) were dispatched from the sink
operators.
After {*}investigation{*}, we found that the implementation AsyncSinkWriter
invokes submitRequestEntries() to send the records to their destination. This
invocation is performed when a callback is performed, a flush(true) or
forced-flush is called, or when the buffered is full (either in size or in
quantity).
The case falls in the first scenario: the _callback is not registered_ {_}when
the writer starts with some existing buffered records{_}, initialized from
savepoint. Hence in our case, those operators were holding records till their
buffers become full, while other operators still perform the usual sending.
Impacted {*}scope{*}: flink-1.15.2 or later version, for any Sink that
implements AsyncSinkWriter.
We currently treat this as an abnormal behavior of Flink, but please let me
know if this behavior is intended by design.
Thanks in advance.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)