dannycranmer commented on a change in pull request #13886:
URL: https://github.com/apache/flink/pull/13886#discussion_r517186975
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -235,8 +243,13 @@ private boolean consumeAllRecordsFromKinesisShard(
String continuationSequenceNumber;
do {
- // Read timeout will occur after 30 seconds, add a
sanity timeout here to prevent lockup
- FanOutSubscriptionEvent subscriptionEvent =
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+ FanOutSubscriptionEvent subscriptionEvent;
+ if (queue.isEmpty() && subscriptionErrorEvent.get() !=
null) {
Review comment:
Events are processed in chronological order, the consumer will process
events in the order they arrive (since an error cancels the subscription, and
no more records will be received on the same subscription). This approach is
the same as using a queue with sufficient space, but avoids the scenario when
the queue is full. For example, consider the following sequence:
1. batch A
2. batch B
3. batch C
4. error
The 4 events will be processed in that order. In the previous approach we
would have to discard of a batch when the queue is at capacity.
If the errors were processed immediately, that would likely mean cancelling
the EFO subscription and reacquiring, which would likely result in us
discarding the queued record batches.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]