This is an automated email from the ASF dual-hosted git repository.

hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git


The following commit(s) were added to refs/heads/main by this push:
     new b207606  [FLINK-34407][Connectors/Kinesis] Fix unstable test
b207606 is described below

commit b207606a95d0ce508c55e69dd0dc6c598eb2fb3c
Author: Aleksandr Pilipenko <[email protected]>
AuthorDate: Fri Feb 9 21:02:03 2024 +0000

    [FLINK-34407][Connectors/Kinesis] Fix unstable test
---
 .../FakeKinesisFanOutBehavioursFactory.java        | 41 ++++++++++++++--------
 1 file changed, 26 insertions(+), 15 deletions(-)

diff --git 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
index 9985fbf..0b49ac5 100644
--- 
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
+++ 
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
@@ -56,9 +56,6 @@ import java.util.stream.IntStream;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 import static 
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.ACTIVE;
 import static 
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.CREATING;
 import static 
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.DELETING;
@@ -479,23 +476,18 @@ public class FakeKinesisFanOutBehavioursFactory {
                                                                 .build());
                                     }
 
-                                    Subscription subscription = 
mock(Subscription.class);
                                     Iterator<SubscribeToShardEvent> iterator =
                                             eventsToSend.iterator();
 
-                                    doAnswer(
-                                                    a -> {
-                                                        if 
(!iterator.hasNext()) {
-                                                            
completeSubscription(subscriber);
-                                                        } else {
+                                    Subscription subscription =
+                                            new FakeSubscription(
+                                                    (n) -> {
+                                                        if 
(iterator.hasNext()) {
                                                             
subscriber.onNext(iterator.next());
+                                                        } else {
+                                                            
completeSubscription(subscriber);
                                                         }
-
-                                                        return null;
-                                                    })
-                                            .when(subscription)
-                                            .request(anyLong());
-
+                                                    });
                                     subscriber.onSubscribe(subscription);
                                 });
                         return null;
@@ -692,6 +684,25 @@ public class FakeKinesisFanOutBehavioursFactory {
         }
     }
 
+    private static class FakeSubscription implements Subscription {
+
+        private final java.util.function.Consumer<Long> onRequest;
+
+        public FakeSubscription(java.util.function.Consumer<Long> onRequest) {
+            this.onRequest = onRequest;
+        }
+
+        @Override
+        public void request(long n) {
+            onRequest.accept(n);
+        }
+
+        @Override
+        public void cancel() {
+            // Nothing to do
+        }
+    }
+
     private static Record createRecord(final AtomicInteger sequenceNumber) {
         return createRecord(randomAlphabetic(32).getBytes(UTF_8), 
sequenceNumber);
     }

Reply via email to