Repository: flink
Updated Branches:
  refs/heads/release-1.5 a0a810720 -> 62839e88e


[FLINK-9691] [kinesis] Modify runloop to try to track a particular getRecords() 
frequency.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62839e88
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62839e88
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62839e88

Branch: refs/heads/release-1.5
Commit: 62839e88e15b338a8af9afcef698c38a194c592f
Parents: a0a8107
Author: Jamie Grier <jgr...@lyft.com>
Authored: Mon Jul 9 14:20:47 2018 -0700
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jul 10 18:34:15 2018 +0200

----------------------------------------------------------------------
 .../connectors/kinesis/internals/ShardConsumer.java          | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62839e88/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index 0d730af..30f0016 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -199,6 +199,7 @@ public class ShardConsumer<T> implements Runnable {
                                }
                        }
 
+                       long lastTimeNanos = 0;
                        while (isRunning()) {
                                if (nextShardItr == null) {
                                        
fetcherRef.updateState(subscribedShardStateIndex, 
SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get());
@@ -207,7 +208,12 @@ public class ShardConsumer<T> implements Runnable {
                                        break;
                                } else {
                                        if (fetchIntervalMillis != 0) {
-                                               
Thread.sleep(fetchIntervalMillis);
+                                               long elapsedTimeNanos = 
System.nanoTime() - lastTimeNanos;
+                                               long sleepTimeMillis = 
fetchIntervalMillis - (elapsedTimeNanos / 1_000_000);
+                                               if (sleepTimeMillis > 0) {
+                                                       
Thread.sleep(sleepTimeMillis);
+                                               }
+                                               lastTimeNanos = 
System.nanoTime();
                                        }
 
                                        GetRecordsResult getRecordsResult = 
getRecords(nextShardItr, maxNumberOfRecordsPerFetch);

Reply via email to