Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r202227845 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -330,4 +347,24 @@ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) thr protected static List<UserRecord> deaggregateRecords(List<Record> records, String startingHashKey, String endingHashKey) { return UserRecord.deaggregate(records, new BigInteger(startingHashKey), new BigInteger(endingHashKey)); } + + /** + * Adapts the maxNumberOfRecordsPerFetch based on the current average record size + * to optimize 2 Mb / sec read limits. + * + * @param averageRecordSizeBytes + * @return adaptedMaxRecordsPerFetch + */ + + protected int getAdaptiveMaxRecordsPerFetch(long averageRecordSizeBytes) { + int adaptedMaxRecordsPerFetch = maxNumberOfRecordsPerFetch; + if (averageRecordSizeBytes != 0 && fetchIntervalMillis != 0) { + adaptedMaxRecordsPerFetch = (int) (KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / (averageRecordSizeBytes * 1000L / fetchIntervalMillis)); + + // Ensure the value is not more than 10000L + adaptedMaxRecordsPerFetch = adaptedMaxRecordsPerFetch <= ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX ? --- End diff -- Changed.
---