This is an automated email from the ASF dual-hosted git repository.
srichter pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 02152e7 [FLINK-10154][connectors] Make sure we always read at least
one record in KinesisConnector.
02152e7 is described below
commit 02152e7d362ff1694c24f2ab11c300ebd7e1de5e
Author: Jamie Grier <[email protected]>
AuthorDate: Wed Aug 15 19:45:38 2018 +0200
[FLINK-10154][connectors] Make sure we always read at least one record in
KinesisConnector.
This closes #6564.
---
.../flink/streaming/connectors/kinesis/internals/ShardConsumer.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
index d698ecf..6de7278 100644
---
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
+++
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
@@ -278,8 +278,8 @@ public class ShardConsumer<T> implements Runnable {
double loopFrequencyHz = 1000000000.0d /
runLoopTimeNanos;
double bytesPerRead =
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
maxNumberOfRecordsPerFetch = (int) (bytesPerRead /
averageRecordSizeBytes);
- // Ensure the value is not more than 10000L
- maxNumberOfRecordsPerFetch =
Math.min(maxNumberOfRecordsPerFetch,
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+ // Ensure the value is greater than 0 and not more than
10000L
+ maxNumberOfRecordsPerFetch = Math.max(1,
Math.min(maxNumberOfRecordsPerFetch,
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX));
}
return maxNumberOfRecordsPerFetch;
}