This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b34ee94  [FLINK-10154][connectors] Make sure we always read at least 
one record in KinesisConnector.
b34ee94 is described below

commit b34ee9475a5828651c551a067272acdcc9e94968
Author: Jamie Grier <[email protected]>
AuthorDate: Wed Aug 15 19:45:38 2018 +0200

    [FLINK-10154][connectors] Make sure we always read at least one record in 
KinesisConnector.
    
    This closes #6564.
---
 .../flink/streaming/connectors/kinesis/internals/ShardConsumer.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index d698ecf..6de7278 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -278,8 +278,8 @@ public class ShardConsumer<T> implements Runnable {
                        double loopFrequencyHz = 1000000000.0d / 
runLoopTimeNanos;
                        double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
                        maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
-                       // Ensure the value is not more than 10000L
-                       maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+                       // Ensure the value is greater than 0 and not more than 
10000L
+                       maxNumberOfRecordsPerFetch = Math.max(1, 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
                }
                return maxNumberOfRecordsPerFetch;
        }

Reply via email to