Github user fmthoma commented on a diff in the pull request:
https://github.com/apache/flink/pull/6021#discussion_r189432840
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
---
@@ -218,6 +232,8 @@ public void invoke(OUT value, Context context) throws
Exception {
throw new RuntimeException("Kinesis producer has been
closed");
}
+ checkAndPropagateAsyncError();
+ checkQueueLimit();
checkAndPropagateAsyncError();
--- End diff --
`snapshotState()` also checks twice explicitly, and I think it makes sense
to have the two checks on the same level. But I won't insist on that, if you
prefer having it more implicitly.
---