This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f8d4e9  Fix AWS2 Kinesis Sink issues
0f8d4e9 is described below

commit 0f8d4e9dac91e382e026072b3283372a2e23e378
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Fri May 14 13:44:53 2021 +0200

    Fix AWS2 Kinesis Sink issues
    
    AWS2 Kinesis Sink test was failing because the shard was not available
    at the time the check was occurring
---
 .../aws/v2/kinesis/common/KinesisUtils.java        | 24 +++++++++++++++++-----
 1 file changed, 19 insertions(+), 5 deletions(-)

diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
index 2e5a7d1..fb1d64a 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -191,18 +192,31 @@ public final class KinesisUtils {
         } while (retries > 0);
     }
 
-    public static GetRecordsRequest getGetRecordsRequest(KinesisClient 
kinesisClient, String streamName) {
-        DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest.builder()
-                .streamName(streamName)
-                .build();
-        List<Shard> shards = new ArrayList<>();
+    private static boolean hasShards(KinesisClient kinesisClient, 
DescribeStreamRequest describeStreamRequest) {
+        DescribeStreamResponse streamRes = 
kinesisClient.describeStream(describeStreamRequest);
+
+        return streamRes.streamDescription().shards().isEmpty();
+    }
 
+    private static List<Shard> getAllShards(KinesisClient kinesisClient, 
DescribeStreamRequest describeStreamRequest) {
+        List<Shard> shards = new ArrayList<>();
         DescribeStreamResponse streamRes;
         do {
             streamRes = kinesisClient.describeStream(describeStreamRequest);
+
             shards.addAll(streamRes.streamDescription().shards());
         } while (streamRes.streamDescription().hasMoreShards());
 
+        return shards;
+    }
+
+    public static GetRecordsRequest getGetRecordsRequest(KinesisClient 
kinesisClient, String streamName) {
+        DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest.builder()
+                .streamName(streamName)
+                .build();
+
+        TestUtils.waitFor(() -> hasShards(kinesisClient, 
describeStreamRequest));
+        List<Shard> shards = getAllShards(kinesisClient, 
describeStreamRequest);
 
         GetShardIteratorRequest iteratorRequest = 
GetShardIteratorRequest.builder()
                 .streamName(streamName)

Reply via email to