This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-aws.git
The following commit(s) were added to refs/heads/main by this push:
new b207606 [FLINK-34407][Connectors/Kinesis] Fix unstable test
b207606 is described below
commit b207606a95d0ce508c55e69dd0dc6c598eb2fb3c
Author: Aleksandr Pilipenko <[email protected]>
AuthorDate: Fri Feb 9 21:02:03 2024 +0000
[FLINK-34407][Connectors/Kinesis] Fix unstable test
---
.../FakeKinesisFanOutBehavioursFactory.java | 41 ++++++++++++++--------
1 file changed, 26 insertions(+), 15 deletions(-)
diff --git
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
index 9985fbf..0b49ac5 100644
---
a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
+++
b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/FakeKinesisFanOutBehavioursFactory.java
@@ -56,9 +56,6 @@ import java.util.stream.IntStream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
import static
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.ACTIVE;
import static
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.CREATING;
import static
software.amazon.awssdk.services.kinesis.model.ConsumerStatus.DELETING;
@@ -479,23 +476,18 @@ public class FakeKinesisFanOutBehavioursFactory {
.build());
}
- Subscription subscription =
mock(Subscription.class);
Iterator<SubscribeToShardEvent> iterator =
eventsToSend.iterator();
- doAnswer(
- a -> {
- if
(!iterator.hasNext()) {
-
completeSubscription(subscriber);
- } else {
+ Subscription subscription =
+ new FakeSubscription(
+ (n) -> {
+ if
(iterator.hasNext()) {
subscriber.onNext(iterator.next());
+ } else {
+
completeSubscription(subscriber);
}
-
- return null;
- })
- .when(subscription)
- .request(anyLong());
-
+ });
subscriber.onSubscribe(subscription);
});
return null;
@@ -692,6 +684,25 @@ public class FakeKinesisFanOutBehavioursFactory {
}
}
+ private static class FakeSubscription implements Subscription {
+
+ private final java.util.function.Consumer<Long> onRequest;
+
+ public FakeSubscription(java.util.function.Consumer<Long> onRequest) {
+ this.onRequest = onRequest;
+ }
+
+ @Override
+ public void request(long n) {
+ onRequest.accept(n);
+ }
+
+ @Override
+ public void cancel() {
+ // Nothing to do
+ }
+ }
+
private static Record createRecord(final AtomicInteger sequenceNumber) {
return createRecord(randomAlphabetic(32).getBytes(UTF_8),
sequenceNumber);
}