dannycranmer commented on a change in pull request #13886:
URL: https://github.com/apache/flink/pull/13886#discussion_r517193360



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -272,8 +289,6 @@ private boolean consumeAllRecordsFromKinesisShard(
 
                private final CountDownLatch waitForSubscriptionLatch;
 
-               private final Object lockObject = new Object();

Review comment:
       Yes you are correct. 
   
   The order of processing that dictates we handle errors after the queue is 
fully consumed is similar to before. When an exception is thrown in the network 
thread, an error is always the last thing the consumer (record publisher) 
receives. Now we are not discarding batches to deliver that exception. 
   
   A better way to look at it is, what does handling the error actually mean? 
For this we would reacquire the EFO subscription to continue consuming data. We 
do not want to do this until we have processed all the records that have 
already arrived. 
   
   Let me know if this is still not clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to