This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d2d6e207e2c NIFI-15835 Improved stability of
PollingKinesisClientTest.testExpiredIteratorRecoveryDoesNotDeliverSameShardOutOfOrder
(#11135)
d2d6e207e2c is described below
commit d2d6e207e2cb4a3e15568b76b7c6f01f9822d885
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Apr 13 23:26:13 2026 +0200
NIFI-15835 Improved stability of
PollingKinesisClientTest.testExpiredIteratorRecoveryDoesNotDeliverSameShardOutOfOrder
(#11135)
Signed-off-by: David Handermann <[email protected]>
---
.../aws/kinesis/PollingKinesisClientTest.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
index d89ad1bb0b2..0dba001e6c4 100644
---
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
+++
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
@@ -194,6 +194,7 @@ class PollingKinesisClientTest {
void testExpiredIteratorRecoveryDoesNotDeliverSameShardOutOfOrder() throws
Exception {
final AtomicInteger getRecordsCallCount = new AtomicInteger();
final AtomicInteger getShardIteratorCallCount = new AtomicInteger();
+ final CountDownLatch firstResultPolled = new CountDownLatch(1);
when(mockShardManager.readCheckpoint(anyString())).thenReturn("100");
when(mockKinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(invocation
-> {
@@ -211,11 +212,14 @@ class PollingKinesisClientTest {
.nextShardIterator("iter-1a")
.millisBehindLatest(0L)
.build();
- case "iter-1a" -> GetRecordsResponse.builder()
- .records(record("300", "B"))
- .nextShardIterator("iter-1b")
- .millisBehindLatest(0L)
- .build();
+ case "iter-1a" -> {
+ firstResultPolled.await(10, TimeUnit.SECONDS);
+ yield GetRecordsResponse.builder()
+ .records(record("300", "B"))
+ .nextShardIterator("iter-1b")
+ .millisBehindLatest(0L)
+ .build();
+ }
case "iter-1b" -> throw
ExpiredIteratorException.builder().message("expired").build();
case "iter-2" -> GetRecordsResponse.builder()
.records(record("200", "A-replay"))
@@ -236,11 +240,7 @@ class PollingKinesisClientTest {
assertNotNull(firstResult, "Initial result must be available");
assertEquals(new BigInteger("200"), firstResult.firstSequenceNumber());
- final long newerQueuedDeadline = System.nanoTime() +
TimeUnit.SECONDS.toNanos(5);
- while (System.nanoTime() < newerQueuedDeadline &&
getRecordsCallCount.get() < 2) {
- Thread.sleep(20);
- }
- Thread.sleep(50);
+ firstResultPolled.countDown();
final long replayQueuedDeadline = System.nanoTime() +
TimeUnit.SECONDS.toNanos(5);
while (System.nanoTime() < replayQueuedDeadline &&
(getShardIteratorCallCount.get() < 2 || getRecordsCallCount.get() < 4)) {