sfc-gh-lkucharski commented on code in PR #10881:
URL: https://github.com/apache/nifi/pull/10881#discussion_r2797782554
##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/MemoryBoundRecordBufferTest.java:
##########
@@ -559,26 +561,67 @@ void testCommitRecordsWhileNewRecordsArrive() {
// Add records to pending queue.
final List<KinesisClientRecord> batch1 = createTestRecords(2);
- recordBuffer.addRecords(bufferId, batch1, checkpointer1);
+ recordBuffer.addRecords(bufferId, batch1, checkpointer1, 100L);
// Consume batch1 records (moves from pending to in-progress).
final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
- final List<KinesisClientRecord> consumedRecords =
recordBuffer.consumeRecords(lease);
+ final List<KinesisClientRecord> consumedRecords =
recordBuffer.consumeRecords(lease).records();
assertEquals(batch1, consumedRecords);
// Add more records while others are in-progress.
final List<KinesisClientRecord> batch2 = createTestRecords(1);
- recordBuffer.addRecords(bufferId, batch2, checkpointer2);
+ recordBuffer.addRecords(bufferId, batch2, checkpointer2, 100L);
// Commit in-progress records.
recordBuffer.commitConsumedRecords(lease);
// Consume batch2 records.
- final List<KinesisClientRecord> remainingRecords =
recordBuffer.consumeRecords(lease);
+ final List<KinesisClientRecord> remainingRecords =
recordBuffer.consumeRecords(lease).records();
assertEquals(batch2, remainingRecords);
}
+ @Test
+ void testConsumeRecordsReturnsMinMillisBehindLatest() {
+ final ShardBufferId bufferId = recordBuffer.createBuffer(SHARD_ID_1);
+
+ recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer1,
200L);
+ recordBuffer.addRecords(bufferId, createTestRecords(1), checkpointer2,
500L);
+
+ final Lease lease = recordBuffer.acquireBufferLease().orElseThrow();
+ final ConsumeRecordsResult result = recordBuffer.consumeRecords(lease);
+
+ assertEquals(2, result.records().size());
+ assertEquals(200L, result.maxMillisBehindLatest());
Review Comment:
Changed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]