This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 61a497e AWS Kinesis test fixes when in remote mode
new 94de150 Merge pull request #345 from orpiske/fix-kinesis-qa
61a497e is described below
commit 61a497e069df96bd055da62f96a6d6e812c2346c
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Wed Jul 29 13:24:09 2020 +0200
AWS Kinesis test fixes when in remote mode
Includes:
- Check for the stream presence before creating it
- Adjusted the code to use a different stream every test
- Removed unused test constants
- Removes unecessary sleep after deletion
---
.../camel/kafkaconnector/aws/common/AWSCommon.java | 10 ---
.../source/CamelSourceAWSKinesisITCase.java | 90 +++++++++++++++-------
2 files changed, 64 insertions(+), 36 deletions(-)
diff --git
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
index f5abf42..98b2c4e 100644
---
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
+++
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/common/AWSCommon.java
@@ -38,20 +38,10 @@ public final class AWSCommon {
public static final String DEFAULT_SQS_QUEUE_FOR_SNS = "ckcsns";
/**
- * The default SNS queue name used during the tests
- */
- public static final String DEFAULT_SNS_QUEUE = "ckc-sns";
-
- /**
* The default S3 bucket name used during the tests
*/
public static final String DEFAULT_S3_BUCKET = "ckc-s3";
- /**
- * The default Kinesis stream name used during the tests
- */
- public static final String DEFAULT_KINESIS_STREAM = "ckc-kin-stream";
-
private AWSCommon() {
}
diff --git
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
index 7ed229b..d27bac1 100644
---
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
+++
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -27,10 +27,12 @@ import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.CreateStreamResult;
import com.amazonaws.services.kinesis.model.DeleteStreamResult;
+import com.amazonaws.services.kinesis.model.DescribeStreamResult;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
import com.amazonaws.services.kinesis.model.PutRecordsResult;
-import org.apache.camel.kafkaconnector.aws.common.AWSCommon;
+import com.amazonaws.services.kinesis.model.ResourceInUseException;
+import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.apache.camel.kafkaconnector.aws.services.AWSService;
import org.apache.camel.kafkaconnector.aws.services.AWSServiceFactory;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -54,9 +56,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
@RegisterExtension
public static AWSService<AmazonKinesis> service =
AWSServiceFactory.createKinesisService();
-
+
private static final Logger LOG =
LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
+ private static final String KINESIS_STREAM_BASE_NAME = "ckc-kin-stream";
+ private String streamName;
+
private AmazonKinesis awsKinesisClient;
private volatile int received;
@@ -67,13 +72,8 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
return new String[] {"camel-aws-kinesis-kafka-connector"};
}
-
- @BeforeEach
- public void setUp() {
- awsKinesisClient = service.getClient();
- received = 0;
-
- CreateStreamResult result =
awsKinesisClient.createStream(AWSCommon.DEFAULT_KINESIS_STREAM, 1);
+ private void doCreateStream() {
+ CreateStreamResult result = awsKinesisClient.createStream(streamName,
1);
if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) {
fail("Failed to create the stream");
} else {
@@ -81,30 +81,67 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
}
}
- @AfterEach
- public void tearDown() {
- DeleteStreamResult result =
awsKinesisClient.deleteStream(AWSCommon.DEFAULT_KINESIS_STREAM);
+ private void createStream() {
+ try {
+ LOG.info("Checking whether the stream exists already");
+ DescribeStreamResult describeStreamResult =
awsKinesisClient.describeStream(streamName);
+
+ int status =
describeStreamResult.getSdkHttpMetadata().getHttpStatusCode();
+ LOG.info("Kinesis stream check result: {}", status);
+ } catch (ResourceNotFoundException e) {
+ LOG.info("The stream does not exist, auto creating it ...");
+ doCreateStream();
+ }
+ }
+
+ private void doDeleteStream() {
+ DeleteStreamResult result = awsKinesisClient.deleteStream(streamName);
if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) {
fail("Failed to delete the stream");
} else {
- try {
- // Because of the latency used to simulate the Kinesis API
call (defined by the KINESIS_LATENCY) in
- // the LocalStack configuration, the test needs to wait at
least the same amount of time as set there
- // in order to proceed. Otherwise the it fails to create the
stream in the setUp phase.
- // Ref.:
https://github.com/localstack/localstack/issues/231#issuecomment-319959693
- Thread.sleep(500);
- LOG.info("Stream deleted successfully");
- } catch (InterruptedException e) {
- fail("Test interrupted while waiting for the stream to cool
down");
- }
+ LOG.info("Stream deleted successfully");
}
+ }
+
+ private void deleteStream() {
+ try {
+ LOG.info("Checking whether the stream exists already");
+ DescribeStreamResult describeStreamResult =
awsKinesisClient.describeStream(streamName);
+
+ int status =
describeStreamResult.getSdkHttpMetadata().getHttpStatusCode();
+ LOG.info("Kinesis stream check result: {}", status);
+ doDeleteStream();
+ } catch (ResourceNotFoundException e) {
+ LOG.info("The stream does not exist, skipping deletion");
+ } catch (ResourceInUseException e) {
+ LOG.info("The stream exist but cannot be deleted because it's in
use");
+ doDeleteStream();
+ }
+ }
+
+
+ @BeforeEach
+ public void setUp() {
+ streamName = KINESIS_STREAM_BASE_NAME + "-" +
TestUtils.randomWithRange(0, 100);
+
+ awsKinesisClient = service.getClient();
+ received = 0;
+
+ createStream();
+ }
+
+
+ @AfterEach
+ public void tearDown() {
+ deleteStream();
awsKinesisClient.shutdown();
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
}
+
private boolean checkRecord(ConsumerRecord<String, String> record) {
LOG.debug("Received: {}", record.value());
received++;
@@ -139,11 +176,12 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(service.getConnectionProperties())
.withConfiguration(TestKinesisConfiguration.class.getName())
- .withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);
+ .withStreamName(streamName);
runtTest(connectorPropertyFactory);
}
+
@Test
@Timeout(120)
public void testBasicSendReceiveWithKafkaStyle() throws
ExecutionException, InterruptedException {
@@ -152,7 +190,7 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(service.getConnectionProperties(),
CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
.withConfiguration(TestKinesisConfiguration.class.getName())
- .withStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);
+ .withStreamName(streamName);
runtTest(connectorPropertyFactory);
}
@@ -165,7 +203,7 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(service.getConnectionProperties())
.withConfiguration(TestKinesisConfiguration.class.getName())
- .withUrl(AWSCommon.DEFAULT_KINESIS_STREAM)
+ .withUrl(streamName)
.buildUrl();
runtTest(connectorPropertyFactory);
@@ -173,7 +211,7 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
private void putRecords() {
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
- putRecordsRequest.setStreamName(AWSCommon.DEFAULT_KINESIS_STREAM);
+ putRecordsRequest.setStreamName(streamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new
ArrayList<>();