Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/6408#discussion_r204999405
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
---
@@ -233,26 +225,69 @@ public void run() {
subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
long recordBatchSizeBytes = 0L;
- long averageRecordSizeBytes = 0L;
-
for (UserRecord record :
fetchedRecords) {
recordBatchSizeBytes +=
record.getData().remaining();
deserializeRecordForCollectionAndUpdateState(record);
}
- if (useAdaptiveReads &&
!fetchedRecords.isEmpty()) {
- averageRecordSizeBytes =
recordBatchSizeBytes / fetchedRecords.size();
- maxNumberOfRecordsPerFetch =
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
- }
-
nextShardItr =
getRecordsResult.getNextShardIterator();
+
+ long processingEndTimeNanos =
System.nanoTime();
--- End diff --
having a local variable here seems a bit redundant, since we always adjust
it right afterwards.
---