Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2432#discussion_r76620194
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
    @@ -219,19 +228,52 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
                        subscribedShard.getStreamName(),
                        subscribedShard.getShard().getShardId());
     
    -           if (record.isAggregated()) {
    -                   fetcherRef.emitRecordAndUpdateState(
    -                           value,
    -                           approxArrivalTimestamp,
    -                           subscribedShardStateIndex,
    -                           new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber()));
    -           } else {
    -                   fetcherRef.emitRecordAndUpdateState(
    -                           value,
    -                           approxArrivalTimestamp,
    -                           subscribedShardStateIndex,
    -                           new SequenceNumber(record.getSequenceNumber()));
    +           SequenceNumber collectedSequenceNumber = (record.isAggregated())
    +                   ? new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber())
    +                   : new SequenceNumber(record.getSequenceNumber());
    +
    +           fetcherRef.emitRecordAndUpdateState(
    +                   value,
    +                   approxArrivalTimestamp,
    +                   subscribedShardStateIndex,
    +                   collectedSequenceNumber);
    +
    +           lastSequenceNum = collectedSequenceNumber;
    +   }
    +
    +   /**
    +    * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
    +    * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
    +    * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
    +    * be used for the next call to this method.
    +    *
    +    * Note: it is important that this method is not called again before 
all the records from the last result have been
    +    * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
    +    * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
    +    * incorrect shard iteration if the iterator had to be refreshed.
    +    *
    +    * @param shardItr shard iterator to use
    +    * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
    +    * @return get records result
    +    * @throws InterruptedException
    +    */
    +   private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws InterruptedException {
    +           GetRecordsResult getRecordsResult = null;
    +           while (getRecordsResult == null) {
    +                   try {
    +                           getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
    +                   } catch (ExpiredIteratorException eiEx) {
    +                           LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
    +                                   " refreshing the iterator ...", 
shardItr, subscribedShard);
    +                           shardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
    +
    +                           // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
    +                           if (fetchIntervalMillis != 0) {
    +                                   Thread.sleep(fetchIntervalMillis);
    +                           }
    --- End diff --
    
    Sorry for the race commit, didn't realize you was still reviewing.
    
    I agree. So, if we're to limit the fetch interval configuration to 5 
minutes, then we'll likely infinitely get stuck in this loop, right? I think 
that was what I had in mind for a more strict 4.5 min, to assure this doesn't 
happen :P But still, logically, we never know what the `n` will be.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to