awelless commented on code in PR #10881:
URL: https://github.com/apache/nifi/pull/10881#discussion_r2793047072


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -761,7 +770,8 @@ public void processRecords(final ProcessRecordsInput 
processRecordsInput) {
             if (bufferId == null) {
                 throw new IllegalStateException("Buffer ID not found: Record 
Processor not initialized");
             }
-            recordBuffer.addRecords(bufferId, processRecordsInput.records(), 
processRecordsInput.checkpointer());
+            recordBuffer.addRecords(bufferId, processRecordsInput.records(),
+                    processRecordsInput.checkpointer(), 
processRecordsInput.millisBehindLatest());

Review Comment:
   I'm not sure if `millisBehindLatest` alone is going to accurately represent 
the lag. The value is computed by Kinesis when returning records from a stream 
([GetRecords 
docs](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#API_GetRecords_ResponseSyntax),
 [SubscribeToShardEvent 
docs](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html#Streams-Type-SubscribeToShardEvent-MillisBehindLatest)).
   But it **does not** take into account the time records spend in KCL caches 
(`processRecordsInput.timeSpendInCache()`) nor the time spent in 
`MemoryBoundRecordBuffer`. 
   
   Because of that it might hide lag increases in scenarios where 
`ConsumeKinesis` backpressure kicks in. 
   Some of the records are still delivered from Kinesis, so 
`millisBehindLatest` stays relatively low while the actual lag grows. 
   After some time `millisBehindLatest` will increase as well. Yet it will be 
visible only when these records are consumed from the buffer.
   
   I'm fine with exposing `millisBehindLatest` provided we mention what this 
gauge really represents altogether with the caveates related to it in the 
documentation. 
   At the moment `additionalDetails.md` seems to be the best place for that. 



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -504,7 +508,13 @@ List<KinesisClientRecord> consumeRecords() {
                 recordsToConsume.addAll(batch.records());
             }
 
-            return recordsToConsume;
+            final Long maxMillisBehindLatest = inProgressBatches.stream()
+                    .map(RecordBatch::millisBehindLatest)
+                    .filter(Objects::nonNull)
+                    .min(Long::compareTo)

Review Comment:
   I think we should just use the last `millisBehindLatest` here. Since the 
batches are ordered, the last batch will store the latest value.
   
   Consider the following scenario:
   1. Batch 1 arrives with 200 ms value. 
   2. Batch 2 arrives with 10 ms value. 
   3. Batch 3 arrives with 1500 ms value. 
   
   Having a value of 10 ms would be misleading, as the delay was 1500 ms when 
the records were received for the last time.
   
   For some reason this fields is made as nullable, although Amazon docs claim 
it must be present.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java:
##########
@@ -205,6 +206,12 @@ void testConsumeSingleMessageFromSingleShard() {
                 List.of(applicationName),
                 streamClient.getEnhancedFanOutConsumerNames(),
                 "Expected a single enhanced fan-out consumer with an 
application name");
+
+        // Verify millisBehindLatest gauge is recorded.
+        final String shardId = flowFile.getAttribute("aws.kinesis.shard.id");
+        final String gaugeName = "kinesis.stream." + streamName + ".shard." + 
shardId + ".millisBehindLatest";
+        final List<Double> gaugeValues = runner.getGaugeValues(gaugeName);
+        assertFalse(gaugeValues.isEmpty(), "Expected millisBehindLatest gauge 
to be recorded");

Review Comment:
   Let's add this in 
`testConsumeSingleMessageFromSingleShard_withoutEnhancedFanOut` to make sure 
the gauge is recorder for `SHARED_THROUGHPUT` consumers as well.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -504,7 +508,13 @@ List<KinesisClientRecord> consumeRecords() {
                 recordsToConsume.addAll(batch.records());
             }
 
-            return recordsToConsume;
+            final Long maxMillisBehindLatest = inProgressBatches.stream()
+                    .map(RecordBatch::millisBehindLatest)
+                    .filter(Objects::nonNull)
+                    .min(Long::compareTo)

Review Comment:
   Also nit: since the buffer class methods are executed very often, it was 
decided to use plain for loops to avoid any unnecessary overhead.
   But this shouldn't be the case, since iteration won't be necessary when 
taking the latest value.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -761,7 +770,8 @@ public void processRecords(final ProcessRecordsInput 
processRecordsInput) {
             if (bufferId == null) {
                 throw new IllegalStateException("Buffer ID not found: Record 
Processor not initialized");
             }
-            recordBuffer.addRecords(bufferId, processRecordsInput.records(), 
processRecordsInput.checkpointer());
+            recordBuffer.addRecords(bufferId, processRecordsInput.records(),
+                    processRecordsInput.checkpointer(), 
processRecordsInput.millisBehindLatest());

Review Comment:
   @exceptionfactory in a long run, what do you think about introducing an 
annotation, indicating a processor exposes a particular gauge/counter? 
   Then the gauges/counters can be placed in the processor documentation 
automatically. Similar to how `@WritesAttribute` works.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java:
##########
@@ -559,26 +561,67 @@ void testCommitRecordsWhileNewRecordsArrive() {
 
         // Add records to pending queue.
         final List<KinesisClientRecord> batch1 = createTestRecords(2);
-        recordBuffer.addRecords(bufferId, batch1, checkpointer1);
+        recordBuffer.addRecords(bufferId, batch1, checkpointer1, 100L);
 
         // Consume batch1 records (moves from pending to in-progress).
         final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
 
-        final List<KinesisClientRecord> consumedRecords = 
recordBuffer.consumeRecords(lease);
+        final List<KinesisClientRecord> consumedRecords = 
recordBuffer.consumeRecords(lease).records();
         assertEquals(batch1, consumedRecords);
 
         // Add more records while others are in-progress.
         final List<KinesisClientRecord> batch2 = createTestRecords(1);
-        recordBuffer.addRecords(bufferId, batch2, checkpointer2);
+        recordBuffer.addRecords(bufferId, batch2, checkpointer2, 100L);
 
         // Commit in-progress records.
         recordBuffer.commitConsumedRecords(lease);
 
         // Consume batch2 records.
-        final List<KinesisClientRecord> remainingRecords = 
recordBuffer.consumeRecords(lease);
+        final List<KinesisClientRecord> remainingRecords = 
recordBuffer.consumeRecords(lease).records();
         assertEquals(batch2, remainingRecords);
     }
 
+    @Test
+    void testConsumeRecordsReturnsMinMillisBehindLatest() {
+        final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+
+        recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer1, 
200L);
+        recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer2, 
500L);
+
+        final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+        final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
+
+        assertEquals(2, result.records().size());
+        assertEquals(200L, result.maxMillisBehindLatest());

Review Comment:
   Mentioned in the `MemoryBoundRecordBuffer` already.
   I believe we should see 500 ms as the gauge value in this scenario.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/RecordBuffer.java:
##########
@@ -37,7 +37,7 @@ interface ForKinesisClientLibrary {
 
         ShardBufferId createBuffer(String shardId);
 
-        void addRecords(ShardBufferId bufferId, List<KinesisClientRecord> 
records, RecordProcessorCheckpointer checkpointer);
+        void addRecords(ShardBufferId bufferId, List<KinesisClientRecord> 
records, RecordProcessorCheckpointer checkpointer, Long millisBehindLatest);

Review Comment:
   Nit: the idea to pass the whole `ProcessRecordInput` to the buffer might be 
neater, especially if we decide to get more fields from that class in the 
future.
   
   I don't have any strong preference in regard to whether we should pass 
`records`, `checkpointer`, `millis...` separately or in the 
`ProcessRecordInput`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to