Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6021#discussion_r189163394
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
---
@@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws
Exception {
throw new RuntimeException("Kinesis producer has been
closed");
}
+ checkAndPropagateAsyncError();
+ checkQueueLimit();
checkAndPropagateAsyncError();
--- End diff --
This second check is to check any async errors that occurred during the
queue flush, correct?
If so, we should probably move this second invocation into
`checkQueueLimit` to make this more implicit.
---