[ 
https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849620#comment-15849620
 ] 

ASF GitHub Bot commented on FLINK-4574:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2925#discussion_r95134466
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
    @@ -154,42 +166,115 @@ public void run() {
                                                }
                                        }
     
    -                                   // set the nextShardItr so we can 
continue iterating in the next while loop
    -                                   nextShardItr = 
getRecordsResult.getNextShardIterator();
    +                                   // set the startShardItr so we can 
continue iterating in the next while loop
    +                                   startShardItr = 
getRecordsResult.getNextShardIterator();
                                } else {
                                        // the last record was non-aggregated, 
so we can simply start from the next record
    -                                   nextShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
    +                                   startShardItr = 
kinesis.getShardIterator(subscribedShard, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), 
lastSequenceNum.getSequenceNumber());
                                }
                        }
     
    -                   while(isRunning()) {
    -                           if (nextShardItr == null) {
    -                                   
fetcherRef.updateState(subscribedShardStateIndex, 
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
    -
    -                                   // we can close this consumer thread 
once we've reached the end of the subscribed shard
    -                                   break;
    -                           } else {
    -                                   if (fetchIntervalMillis != 0) {
    -                                           
Thread.sleep(fetchIntervalMillis);
    -                                   }
    +                   ArrayBlockingQueue<UserRecord> queue = new 
ArrayBlockingQueue<>(maxNumberOfRecordsPerFetch);
    +                   ShardConsumerFetcher shardConsumerFetcher;
     
    -                                   GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
    +                   if (fetchIntervalMillis > 0L) {
    +                           shardConsumerFetcher = new 
ShardConsumerFetcher(this, startShardItr, queue, false);
    +                           timer.scheduleAtFixedRate(shardConsumerFetcher, 
0L, fetchIntervalMillis);
    +                   } else {
    +                           // if fetchIntervalMillis is 0, make the task 
run forever and schedule it once only.
    +                           shardConsumerFetcher = new 
ShardConsumerFetcher(this, startShardItr, queue, true);
    +                           timer.schedule(shardConsumerFetcher, 0L);
    +                   }
     
    -                                   // each of the Kinesis records may be 
aggregated, so we must deaggregate them before proceeding
    -                                   List<UserRecord> fetchedRecords = 
deaggregateRecords(
    -                                           getRecordsResult.getRecords(),
    -                                           
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
    -                                           
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
    +                   while(isRunning()) {
    +                           UserRecord record = queue.poll();
    +                           if (record != null) {
    +                                   
deserializeRecordForCollectionAndUpdateState(record);
    +                           } else {
    +                                   if (shardConsumerFetcher.nextShardItr 
== null) {
    +                                           
fetcherRef.updateState(subscribedShardStateIndex, 
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
     
    -                                   for (UserRecord record : 
fetchedRecords) {
    -                                           
deserializeRecordForCollectionAndUpdateState(record);
    +                                           // we can close this consumer 
thread once we've reached the end of the subscribed shard
    +                                           break;
                                        }
    +                           }
     
    -                                   nextShardItr = 
getRecordsResult.getNextShardIterator();
    +                           Throwable throwable = this.error.get();
    +                           if (throwable != null) {
    +                                   throw throwable;
                                }
                        }
                } catch (Throwable t) {
                        fetcherRef.stopWithError(t);
    +           } finally {
    +                   timer.cancel();
    --- End diff --
    
    Calling `cancel` on a Timer doesn't handle any in-progress tasks, it only 
discards the current _scheduled_ tasks. So for the case where 
`fetchIntervalMillis` is 0, the forever running fetcher task will not be 
terminated.


> Strengthen fetch interval implementation in Kinesis consumer
> ------------------------------------------------------------
>
>                 Key: FLINK-4574
>                 URL: https://issues.apache.org/jira/browse/FLINK-4574
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>    Affects Versions: 1.1.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>
> As pointed out by [~rmetzger], right now the fetch interval implementation in 
> the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer 
> interval times than specified by the user, ex. say the specified fetch 
> interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and 
> {{y}} to complete processing the fetched records for emitting, than the 
> actual interval between each fetch is actually {{f+x+y}}.
> The main problem with this is that we can never guarantee how much time has 
> past since the last {{getRecords}} call, thus can not guarantee that returned 
> shard iterators will not have expired the next time we use them, even if we 
> limit the user-given value for {{f}} to not be longer than the iterator 
> expire time.
> I propose to improve this by, per {{ShardConsumer}}, use a 
> {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, 
> and a separate blocking queue that collects the fetched records for emitting.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to