This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/main by this push: new 0f8d4e9 Fix AWS2 Kinesis Sink issues 0f8d4e9 is described below commit 0f8d4e9dac91e382e026072b3283372a2e23e378 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri May 14 13:44:53 2021 +0200 Fix AWS2 Kinesis Sink issues AWS2 Kinesis Sink test was failing because the shard was not available at the time the check was occurring --- .../aws/v2/kinesis/common/KinesisUtils.java | 24 +++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java index 2e5a7d1..fb1d64a 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; @@ -191,18 +192,31 @@ public final class KinesisUtils { } while (retries > 0); } - public static GetRecordsRequest getGetRecordsRequest(KinesisClient kinesisClient, String streamName) { - DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() - .streamName(streamName) - .build(); - List<Shard> shards = new ArrayList<>(); + private static boolean hasShards(KinesisClient kinesisClient, DescribeStreamRequest describeStreamRequest) { + DescribeStreamResponse streamRes = kinesisClient.describeStream(describeStreamRequest); + + return streamRes.streamDescription().shards().isEmpty(); + } + private static List<Shard> getAllShards(KinesisClient kinesisClient, DescribeStreamRequest describeStreamRequest) { + List<Shard> shards = new ArrayList<>(); DescribeStreamResponse streamRes; do { streamRes = kinesisClient.describeStream(describeStreamRequest); + shards.addAll(streamRes.streamDescription().shards()); } while (streamRes.streamDescription().hasMoreShards()); + return shards; + } + + public static GetRecordsRequest getGetRecordsRequest(KinesisClient kinesisClient, String streamName) { + DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() + .streamName(streamName) + .build(); + + TestUtils.waitFor(() -> hasShards(kinesisClient, describeStreamRequest)); + List<Shard> shards = getAllShards(kinesisClient, describeStreamRequest); GetShardIteratorRequest iteratorRequest = GetShardIteratorRequest.builder() .streamName(streamName)