Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2432#discussion_r76617364
  
    --- Diff: 
flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
    @@ -219,19 +228,52 @@ private void 
deserializeRecordForCollectionAndUpdateState(UserRecord record)
                        subscribedShard.getStreamName(),
                        subscribedShard.getShard().getShardId());
     
    -           if (record.isAggregated()) {
    -                   fetcherRef.emitRecordAndUpdateState(
    -                           value,
    -                           approxArrivalTimestamp,
    -                           subscribedShardStateIndex,
    -                           new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber()));
    -           } else {
    -                   fetcherRef.emitRecordAndUpdateState(
    -                           value,
    -                           approxArrivalTimestamp,
    -                           subscribedShardStateIndex,
    -                           new SequenceNumber(record.getSequenceNumber()));
    +           SequenceNumber collectedSequenceNumber = (record.isAggregated())
    +                   ? new SequenceNumber(record.getSequenceNumber(), 
record.getSubSequenceNumber())
    +                   : new SequenceNumber(record.getSequenceNumber());
    +
    +           fetcherRef.emitRecordAndUpdateState(
    +                   value,
    +                   approxArrivalTimestamp,
    +                   subscribedShardStateIndex,
    +                   collectedSequenceNumber);
    +
    +           lastSequenceNum = collectedSequenceNumber;
    +   }
    +
    +   /**
    +    * Calls {@link KinesisProxyInterface#getRecords(String, int)}, while 
also handling unexpected
    +    * AWS {@link ExpiredIteratorException}s to assure that we get results 
and don't just fail on
    +    * such occasions. The returned shard iterator within the successful 
{@link GetRecordsResult} should
    +    * be used for the next call to this method.
    +    *
    +    * Note: it is important that this method is not called again before 
all the records from the last result have been
    +    * fully collected with {@link 
ShardConsumer#deserializeRecordForCollectionAndUpdateState(UserRecord)}, 
otherwise
    +    * {@link ShardConsumer#lastSequenceNum} may refer to a sub-record in 
the middle of an aggregated record, leading to
    +    * incorrect shard iteration if the iterator had to be refreshed.
    +    *
    +    * @param shardItr shard iterator to use
    +    * @param maxNumberOfRecords the maximum number of records to fetch for 
this getRecords attempt
    +    * @return get records result
    +    * @throws InterruptedException
    +    */
    +   private GetRecordsResult getRecords(String shardItr, int 
maxNumberOfRecords) throws InterruptedException {
    +           GetRecordsResult getRecordsResult = null;
    +           while (getRecordsResult == null) {
    +                   try {
    +                           getRecordsResult = kinesis.getRecords(shardItr, 
maxNumberOfRecords);
    +                   } catch (ExpiredIteratorException eiEx) {
    +                           LOG.warn("Encountered an unexpected expired 
iterator {} for shard {};" +
    +                                   " refreshing the iterator ...", 
shardItr, subscribedShard);
    +                           shardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
    +
    +                           // sleep for the fetch interval before the next 
getRecords attempt with the refreshed iterator
    +                           if (fetchIntervalMillis != 0) {
    +                                   Thread.sleep(fetchIntervalMillis);
    +                           }
    --- End diff --
    
    This fetchInterval implementation can lead to much larger fetch intervals.
    If the getRecords call needs `n` milliseconds, the waiting time between 
each `getRecords` call is is `n + fetchInterval`.
    We don't need to fix this in this PR, but I think in general, we should fix 
it (if you agree). Also, we need to see how we make this efficient 
(System.currentTimeMilis() is a somewhat expensive call).
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to