This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d2d6e207e2c NIFI-15835 Improved stability of 
PollingKinesisClientTest.testExpiredIteratorRecoveryDoesNotDeliverSameShardOutOfOrder
 (#11135)
d2d6e207e2c is described below

commit d2d6e207e2cb4a3e15568b76b7c6f01f9822d885
Author: Pierre Villard <[email protected]>
AuthorDate: Mon Apr 13 23:26:13 2026 +0200

    NIFI-15835 Improved stability of 
PollingKinesisClientTest.testExpiredIteratorRecoveryDoesNotDeliverSameShardOutOfOrder
 (#11135)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../aws/kinesis/PollingKinesisClientTest.java        | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
index d89ad1bb0b2..0dba001e6c4 100644
--- 
a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
+++ 
b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-kinesis/src/test/java/org/apache/nifi/processors/aws/kinesis/PollingKinesisClientTest.java
@@ -194,6 +194,7 @@ class PollingKinesisClientTest {
     void testExpiredIteratorRecoveryDoesNotDeliverSameShardOutOfOrder() throws 
Exception {
         final AtomicInteger getRecordsCallCount = new AtomicInteger();
         final AtomicInteger getShardIteratorCallCount = new AtomicInteger();
+        final CountDownLatch firstResultPolled = new CountDownLatch(1);
 
         when(mockShardManager.readCheckpoint(anyString())).thenReturn("100");
         
when(mockKinesisClient.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(invocation
 -> {
@@ -211,11 +212,14 @@ class PollingKinesisClientTest {
                         .nextShardIterator("iter-1a")
                         .millisBehindLatest(0L)
                         .build();
-                case "iter-1a" -> GetRecordsResponse.builder()
-                        .records(record("300", "B"))
-                        .nextShardIterator("iter-1b")
-                        .millisBehindLatest(0L)
-                        .build();
+                case "iter-1a" -> {
+                    firstResultPolled.await(10, TimeUnit.SECONDS);
+                    yield GetRecordsResponse.builder()
+                            .records(record("300", "B"))
+                            .nextShardIterator("iter-1b")
+                            .millisBehindLatest(0L)
+                            .build();
+                }
                 case "iter-1b" -> throw 
ExpiredIteratorException.builder().message("expired").build();
                 case "iter-2" -> GetRecordsResponse.builder()
                         .records(record("200", "A-replay"))
@@ -236,11 +240,7 @@ class PollingKinesisClientTest {
         assertNotNull(firstResult, "Initial result must be available");
         assertEquals(new BigInteger("200"), firstResult.firstSequenceNumber());
 
-        final long newerQueuedDeadline = System.nanoTime() + 
TimeUnit.SECONDS.toNanos(5);
-        while (System.nanoTime() < newerQueuedDeadline && 
getRecordsCallCount.get() < 2) {
-            Thread.sleep(20);
-        }
-        Thread.sleep(50);
+        firstResultPolled.countDown();
 
         final long replayQueuedDeadline = System.nanoTime() + 
TimeUnit.SECONDS.toNanos(5);
         while (System.nanoTime() < replayQueuedDeadline && 
(getShardIteratorCallCount.get() < 2 || getRecordsCallCount.get() < 4)) {

Reply via email to