[
https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15849898#comment-15849898
]
ASF GitHub Bot commented on FLINK-4574:
---------------------------------------
Github user tony810430 commented on a diff in the pull request:
https://github.com/apache/flink/pull/2925#discussion_r99117155
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
---
@@ -154,42 +166,115 @@ public void run() {
}
}
- // set the nextShardItr so we can
continue iterating in the next while loop
- nextShardItr =
getRecordsResult.getNextShardIterator();
+ // set the startShardItr so we can
continue iterating in the next while loop
+ startShardItr =
getRecordsResult.getNextShardIterator();
} else {
// the last record was non-aggregated,
so we can simply start from the next record
- nextShardItr =
kinesis.getShardIterator(subscribedShard,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
+ startShardItr =
kinesis.getShardIterator(subscribedShard,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(),
lastSequenceNum.getSequenceNumber());
}
}
- while(isRunning()) {
- if (nextShardItr == null) {
-
fetcherRef.updateState(subscribedShardStateIndex,
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
-
- // we can close this consumer thread
once we've reached the end of the subscribed shard
- break;
- } else {
- if (fetchIntervalMillis != 0) {
-
Thread.sleep(fetchIntervalMillis);
- }
+ ArrayBlockingQueue<UserRecord> queue = new
ArrayBlockingQueue<>(maxNumberOfRecordsPerFetch);
+ ShardConsumerFetcher shardConsumerFetcher;
- GetRecordsResult getRecordsResult =
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);
+ if (fetchIntervalMillis > 0L) {
+ shardConsumerFetcher = new
ShardConsumerFetcher(this, startShardItr, queue, false);
+ timer.scheduleAtFixedRate(shardConsumerFetcher,
0L, fetchIntervalMillis);
+ } else {
+ // if fetchIntervalMillis is 0, make the task
run forever and schedule it once only.
+ shardConsumerFetcher = new
ShardConsumerFetcher(this, startShardItr, queue, true);
+ timer.schedule(shardConsumerFetcher, 0L);
+ }
- // each of the Kinesis records may be
aggregated, so we must deaggregate them before proceeding
- List<UserRecord> fetchedRecords =
deaggregateRecords(
- getRecordsResult.getRecords(),
-
subscribedShard.getShard().getHashKeyRange().getStartingHashKey(),
-
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
+ while(isRunning()) {
+ UserRecord record = queue.poll();
+ if (record != null) {
+
deserializeRecordForCollectionAndUpdateState(record);
+ } else {
+ if (shardConsumerFetcher.nextShardItr
== null) {
+
fetcherRef.updateState(subscribedShardStateIndex,
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
- for (UserRecord record :
fetchedRecords) {
-
deserializeRecordForCollectionAndUpdateState(record);
+ // we can close this consumer
thread once we've reached the end of the subscribed shard
+ break;
}
+ }
- nextShardItr =
getRecordsResult.getNextShardIterator();
+ Throwable throwable = this.error.get();
+ if (throwable != null) {
+ throw throwable;
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
+ } finally {
+ timer.cancel();
+ }
+ }
+
+ private class ShardConsumerFetcher extends TimerTask {
+ private String nextShardItr;
+
+ private final ShardConsumer<T> shardConsumerRef;
+
+ private final ArrayBlockingQueue<UserRecord> userRecordQueue;
+
+ /** The latest finish time for fetching data from Kinesis used
to recognize if the following task has been delayed.*/
+ private Long lastFinishTime = -1L;
+
+ private boolean runForever;
+
+ ShardConsumerFetcher(ShardConsumer<T> shardConsumerRef,
+ String nextShardItr,
+
ArrayBlockingQueue<UserRecord> userRecordQueue,
+ boolean runForever) {
+ this.shardConsumerRef = shardConsumerRef;
+ this.nextShardItr = nextShardItr;
+ this.userRecordQueue = userRecordQueue;
+ this.runForever = runForever;
+ }
+
+ @Override
+ public void run() {
+ try {
+ do {
+ if (nextShardItr != null) {
+ // ignore to log this warning
if runForever is true, since fetchIntervalMillis is 0
+ if (!runForever &&
this.scheduledExecutionTime() < lastFinishTime) {
--- End diff --
The implementation for `scheduledExecutionTime` is
```
public long scheduledExecutionTime() {
synchronized(lock) {
return (period < 0 ? nextExecutionTime + period : nextExecutionTime
- period);
}
}
```
and `nextExecutionTime` is updated by `nextExecutionTime + period`.
In `scheduleAtFixedRate` mode, `scheduledExecutionTime` will actually
return the expected execution time, so I would use it to determine if the task
was delayed.
> Strengthen fetch interval implementation in Kinesis consumer
> ------------------------------------------------------------
>
> Key: FLINK-4574
> URL: https://issues.apache.org/jira/browse/FLINK-4574
> Project: Flink
> Issue Type: Improvement
> Components: Kinesis Connector
> Affects Versions: 1.1.0
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Wei-Che Wei
>
> As pointed out by [~rmetzger], right now the fetch interval implementation in
> the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer
> interval times than specified by the user, ex. say the specified fetch
> interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and
> {{y}} to complete processing the fetched records for emitting, than the
> actual interval between each fetch is actually {{f+x+y}}.
> The main problem with this is that we can never guarantee how much time has
> past since the last {{getRecords}} call, thus can not guarantee that returned
> shard iterators will not have expired the next time we use them, even if we
> limit the user-given value for {{f}} to not be longer than the iterator
> expire time.
> I propose to improve this by, per {{ShardConsumer}}, use a
> {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching,
> and a separate blocking queue that collects the fetched records for emitting.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)