Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/6300#discussion_r201792650 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -224,10 +232,19 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + long recordBatchSizeBytes = 0L; + long averageRecordSizeBytes = 0L; + for (UserRecord record : fetchedRecords) { + recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } + if (useAdaptiveReads && fetchedRecords.size() != 0) { --- End diff -- nit: && !isEmpty()
---