awelless commented on code in PR #10881:
URL: https://github.com/apache/nifi/pull/10881#discussion_r2793047072
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -761,7 +770,8 @@ public void processRecords(final ProcessRecordsInput
processRecordsInput) {
if (bufferId == null) {
throw new IllegalStateException("Buffer ID not found: Record
Processor not initialized");
}
- recordBuffer.addRecords(bufferId, processRecordsInput.records(),
processRecordsInput.checkpointer());
+ recordBuffer.addRecords(bufferId, processRecordsInput.records(),
+ processRecordsInput.checkpointer(),
processRecordsInput.millisBehindLatest());
Review Comment:
I'm not sure if `millisBehindLatest` alone is going to accurately represent
the lag. The value is computed by Kinesis when returning records from a stream
([GetRecords
docs](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html#API_GetRecords_ResponseSyntax),
[SubscribeToShardEvent
docs](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html#Streams-Type-SubscribeToShardEvent-MillisBehindLatest)).
But it **does not** take into account the time records spend in KCL caches
(`processRecordsInput.timeSpendInCache()`) nor the time spent in
`MemoryBoundRecordBuffer`.
Because of that it might hide lag increases in scenarios where
`ConsumeKinesis` backpressure kicks in.
Some of the records are still delivered from Kinesis, so
`millisBehindLatest` stays relatively low while the actual lag grows.
After some time `millisBehindLatest` will increase as well. Yet it will be
visible only when these records are consumed from the buffer.
I'm fine with exposing `millisBehindLatest` provided we mention what this
gauge really represents altogether with the caveates related to it in the
documentation.
At the moment `additionalDetails.md` seems to be the best place for that.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -504,7 +508,13 @@ List<KinesisClientRecord> consumeRecords() {
recordsToConsume.addAll(batch.records());
}
- return recordsToConsume;
+ final Long maxMillisBehindLatest = inProgressBatches.stream()
+ .map(RecordBatch::millisBehindLatest)
+ .filter(Objects::nonNull)
+ .min(Long::compareTo)
Review Comment:
I think we should just use the last `millisBehindLatest` here. Since the
batches are ordered, the last batch will store the latest value.
Consider the following scenario:
1. Batch 1 arrives with 200 ms value.
2. Batch 2 arrives with 10 ms value.
3. Batch 3 arrives with 1500 ms value.
Having a value of 10 ms would be misleading, as the delay was 1500 ms when
the records were received for the last time.
For some reason this fields is made as nullable, although Amazon docs claim
it must be present.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesisIT.java:
##########
@@ -205,6 +206,12 @@ void testConsumeSingleMessageFromSingleShard() {
List.of(applicationName),
streamClient.getEnhancedFanOutConsumerNames(),
"Expected a single enhanced fan-out consumer with an
application name");
+
+ // Verify millisBehindLatest gauge is recorded.
+ final String shardId = flowFile.getAttribute("aws.kinesis.shard.id");
+ final String gaugeName = "kinesis.stream." + streamName + ".shard." +
shardId + ".millisBehindLatest";
+ final List<Double> gaugeValues = runner.getGaugeValues(gaugeName);
+ assertFalse(gaugeValues.isEmpty(), "Expected millisBehindLatest gauge
to be recorded");
Review Comment:
Let's add this in
`testConsumeSingleMessageFromSingleShard_withoutEnhancedFanOut` to make sure
the gauge is recorder for `SHARED_THROUGHPUT` consumers as well.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBuffer.java:
##########
@@ -504,7 +508,13 @@ List<KinesisClientRecord> consumeRecords() {
recordsToConsume.addAll(batch.records());
}
- return recordsToConsume;
+ final Long maxMillisBehindLatest = inProgressBatches.stream()
+ .map(RecordBatch::millisBehindLatest)
+ .filter(Objects::nonNull)
+ .min(Long::compareTo)
Review Comment:
Also nit: since the buffer class methods are executed very often, it was
decided to use plain for loops to avoid any unnecessary overhead.
But this shouldn't be the case, since iteration won't be necessary when
taking the latest value.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/ConsumeKinesis.java:
##########
@@ -761,7 +770,8 @@ public void processRecords(final ProcessRecordsInput
processRecordsInput) {
if (bufferId == null) {
throw new IllegalStateException("Buffer ID not found: Record
Processor not initialized");
}
- recordBuffer.addRecords(bufferId, processRecordsInput.records(),
processRecordsInput.checkpointer());
+ recordBuffer.addRecords(bufferId, processRecordsInput.records(),
+ processRecordsInput.checkpointer(),
processRecordsInput.millisBehindLatest());
Review Comment:
@exceptionfactory in a long run, what do you think about introducing an
annotation, indicating a processor exposes a particular gauge/counter?
Then the gauges/counters can be placed in the processor documentation
automatically. Similar to how `@WritesAttribute` works.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java:
##########
@@ -559,26 +561,67 @@ void testCommitRecordsWhileNewRecordsArrive() {
// Add records to pending queue.
final List<KinesisClientRecord> batch1 = createTestRecords(2);
- recordBuffer.addRecords(bufferId, batch1, checkpointer1);
+ recordBuffer.addRecords(bufferId, batch1, checkpointer1, 100L);
// Consume batch1 records (moves from pending to in-progress).
final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
- final List<KinesisClientRecord> consumedRecords =
recordBuffer.consumeRecords(lease);
+ final List<KinesisClientRecord> consumedRecords =
recordBuffer.consumeRecords(lease).records();
assertEquals(batch1, consumedRecords);
// Add more records while others are in-progress.
final List<KinesisClientRecord> batch2 = createTestRecords(1);
- recordBuffer.addRecords(bufferId, batch2, checkpointer2);
+ recordBuffer.addRecords(bufferId, batch2, checkpointer2, 100L);
// Commit in-progress records.
recordBuffer.commitConsumedRecords(lease);
// Consume batch2 records.
- final List<KinesisClientRecord> remainingRecords =
recordBuffer.consumeRecords(lease);
+ final List<KinesisClientRecord> remainingRecords =
recordBuffer.consumeRecords(lease).records();
assertEquals(batch2, remainingRecords);
}
+ @Test
+ void testConsumeRecordsReturnsMinMillisBehindLatest() {
+ final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+
+ recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer1,
200L);
+ recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer2,
500L);
+
+ final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+ final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
+
+ assertEquals(2, result.records().size());
+ assertEquals(200L, result.maxMillisBehindLatest());
Review Comment:
Mentioned in the `MemoryBoundRecordBuffer` already.
I believe we should see 500 ms as the gauge value in this scenario.
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/main/java/org/apache/nifi/processors/aws/kinesis/RecordBuffer.java:
##########
@@ -37,7 +37,7 @@ interface ForKinesisClientLibrary {
ShardBufferId createBuffer(String shardId);
- void addRecords(ShardBufferId bufferId, List<KinesisClientRecord>
records, RecordProcessorCheckpointer checkpointer);
+ void addRecords(ShardBufferId bufferId, List<KinesisClientRecord>
records, RecordProcessorCheckpointer checkpointer, Long millisBehindLatest);
Review Comment:
Nit: the idea to pass the whole `ProcessRecordInput` to the buffer might be
neater, especially if we decide to get more fields from that class in the
future.
I don't have any strong preference in regard to whether we should pass
`records`, `checkpointer`, `millis...` separately or in the
`ProcessRecordInput`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]