Repository: flink Updated Branches: refs/heads/release-1.5 a0a810720 -> 62839e88e
[FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() frequency. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62839e88 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62839e88 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62839e88 Branch: refs/heads/release-1.5 Commit: 62839e88e15b338a8af9afcef698c38a194c592f Parents: a0a8107 Author: Jamie Grier <jgr...@lyft.com> Authored: Mon Jul 9 14:20:47 2018 -0700 Committer: Stephan Ewen <se...@apache.org> Committed: Tue Jul 10 18:34:15 2018 +0200 ---------------------------------------------------------------------- .../connectors/kinesis/internals/ShardConsumer.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/62839e88/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 0d730af..30f0016 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -199,6 +199,7 @@ public class ShardConsumer<T> implements Runnable { } } + long lastTimeNanos = 0; while (isRunning()) { if (nextShardItr == null) { fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()); @@ -207,7 +208,12 @@ public class ShardConsumer<T> implements Runnable { break; } else { if (fetchIntervalMillis != 0) { - Thread.sleep(fetchIntervalMillis); + long elapsedTimeNanos = System.nanoTime() - lastTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (elapsedTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + } + lastTimeNanos = System.nanoTime(); } GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch);