This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9957652 Fixes AWS test cases when running with remote AWS services
new a27894e Merge pull request #308 from
orpiske/fix-aws-cases-remote-execution
9957652 is described below
commit 995765277c241d5012bee4b4952b792bd054d8e8
Author: Otavio R. Piske <[email protected]>
AuthorDate: Fri Jun 26 22:05:27 2020 +0200
Fixes AWS test cases when running with remote AWS services
---
.../aws/s3/source/CamelSourceAWSS3ITCase.java | 2 +-
.../aws/sqs/sink/CamelSinkAWSSQSITCase.java | 19 ++++++++++++-------
.../aws/sqs/source/CamelSourceAWSSQSITCase.java | 12 +++++++-----
3 files changed, 20 insertions(+), 13 deletions(-)
diff --git
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java
index 9f4e657..e9538e1 100644
---
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java
+++
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/s3/source/CamelSourceAWSS3ITCase.java
@@ -170,7 +170,7 @@ public class CamelSourceAWSS3ITCase extends
AbstractKafkaTest {
.withUrl(AWSCommon.DEFAULT_S3_BUCKET)
.append("accessKey",
amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey",
amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
- .append("proxyProtocol",
amazonProperties.getProperty(AWSConfigs.PROTOCOL))
+ .appendIfAvailable("proxyProtocol",
amazonProperties.getProperty(AWSConfigs.PROTOCOL))
.append("region",
amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name()))
.buildUrl();
diff --git
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java
index d3cb6c7..931b4a1 100644
---
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java
+++
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);
private AWSSQSClient awssqsClient;
+ private String queueName;
private volatile int received;
private final int expect = 10;
@@ -69,7 +71,9 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
public void setUp() {
awssqsClient = awsService.getClient();
- String queueUrl = awssqsClient.getQueue(AWSCommon.DEFAULT_SQS_QUEUE);
+ queueName = AWSCommon.DEFAULT_SQS_QUEUE + "-" +
TestUtils.randomWithRange(0, 1000);
+ String queueUrl = awssqsClient.getQueue(queueName);
+
LOG.debug("Using queue {} for the test", queueUrl);
received = 0;
@@ -78,7 +82,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
@AfterEach
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
- if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
+ if (!awssqsClient.deleteQueue(queueName)) {
fail("Failed to delete queue");
}
}
@@ -100,7 +104,7 @@ public class CamelSinkAWSSQSITCase extends
AbstractKafkaTest {
private void consumeMessages(CountDownLatch latch) {
try {
- awssqsClient.receive(AWSCommon.DEFAULT_SQS_QUEUE,
this::checkMessages);
+ awssqsClient.receive(queueName, this::checkMessages);
} catch (Throwable t) {
LOG.error("Failed to consume messages: {}", t.getMessage(), t);
} finally {
@@ -147,7 +151,6 @@ public class CamelSinkAWSSQSITCase extends
AbstractKafkaTest {
@Test
@Timeout(value = 120)
- @RepeatedTest(3)
public void testBasicSendReceive() {
try {
Properties amazonProperties = awsService.getConnectionProperties();
@@ -157,7 +160,7 @@ public class CamelSinkAWSSQSITCase extends
AbstractKafkaTest {
.withName("CamelAwssqsSinkConnectorSpringBootStyle")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(amazonProperties)
- .withQueueNameOrArn(AWSCommon.DEFAULT_SQS_QUEUE);
+ .withQueueNameOrArn(queueName);
runTest(testProperties);
@@ -167,6 +170,7 @@ public class CamelSinkAWSSQSITCase extends
AbstractKafkaTest {
}
}
+ @DisabledIfSystemProperty(named = "aws-service.instance.type", matches =
"remote")
@Test
@Timeout(value = 120)
@RepeatedTest(3)
@@ -179,7 +183,7 @@ public class CamelSinkAWSSQSITCase extends
AbstractKafkaTest {
.withName("CamelAwssqsSinkConnectorKafkaStyle")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
.withAmazonConfig(amazonProperties,
CamelAWSSQSPropertyFactory.KAFKA_STYLE)
- .withQueueNameOrArn(AWSCommon.DEFAULT_SQS_QUEUE);
+ .withQueueNameOrArn(queueName);
runTest(testProperties);
@@ -189,6 +193,7 @@ public class CamelSinkAWSSQSITCase extends
AbstractKafkaTest {
}
}
+ @DisabledIfSystemProperty(named = "aws-service.instance.type", matches =
"remote")
@Test
@Timeout(value = 120)
@RepeatedTest(3)
@@ -200,7 +205,7 @@ public class CamelSinkAWSSQSITCase extends
AbstractKafkaTest {
.basic()
.withName("CamelAwssqsSinkConnectorUsingUrl")
.withTopics(TestUtils.getDefaultTestTopic(this.getClass()))
- .withUrl(AWSCommon.DEFAULT_SQS_QUEUE)
+ .withUrl(queueName)
.append("autoCreateQueue", "true")
.append("accessKey",
amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey",
amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
diff --git
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java
index 634a6e7..b866ee7 100644
---
a/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java
+++
b/tests/itests-aws/src/test/java/org/apache/camel/kafkaconnector/aws/sqs/source/CamelSourceAWSSQSITCase.java
@@ -52,6 +52,7 @@ public class CamelSourceAWSSQSITCase extends
AbstractKafkaTest {
private static final Logger LOG =
LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class);
private AWSSQSClient awssqsClient;
+ private String queueName;
private volatile int received;
private final int expect = 10;
@@ -64,6 +65,7 @@ public class CamelSourceAWSSQSITCase extends
AbstractKafkaTest {
@BeforeEach
public void setUp() {
awssqsClient = service.getClient();
+ queueName = AWSCommon.DEFAULT_SQS_QUEUE + "-" +
TestUtils.randomWithRange(0, 1000);
received = 0;
}
@@ -71,7 +73,7 @@ public class CamelSourceAWSSQSITCase extends
AbstractKafkaTest {
public void tearDown() {
deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
- if (!awssqsClient.deleteQueue(AWSCommon.DEFAULT_SQS_QUEUE)) {
+ if (!awssqsClient.deleteQueue(queueName)) {
fail("Failed to delete queue");
}
}
@@ -93,7 +95,7 @@ public class CamelSourceAWSSQSITCase extends
AbstractKafkaTest {
LOG.debug("Sending SQS messages");
for (int i = 0; i < expect; i++) {
- awssqsClient.send(AWSCommon.DEFAULT_SQS_QUEUE, "Source test
message " + i);
+ awssqsClient.send(queueName, "Source test message " + i);
}
LOG.debug("Done sending SQS messages");
@@ -111,7 +113,7 @@ public class CamelSourceAWSSQSITCase extends
AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSSQSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withQueueOrArn(AWSCommon.DEFAULT_SQS_QUEUE)
+ .withQueueOrArn(queueName)
.withAmazonConfig(service.getConnectionProperties());
runTest(connectorPropertyFactory);
@@ -126,7 +128,7 @@ public class CamelSourceAWSSQSITCase extends
AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSSQSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withQueueOrArn(AWSCommon.DEFAULT_SQS_QUEUE)
+ .withQueueOrArn(queueName)
.withAmazonConfig(service.getConnectionProperties(),
CamelAWSSQSPropertyFactory.KAFKA_STYLE);
runTest(connectorPropertyFactory);
@@ -143,7 +145,7 @@ public class CamelSourceAWSSQSITCase extends
AbstractKafkaTest {
ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSSQSPropertyFactory
.basic()
.withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withUrl(AWSCommon.DEFAULT_SQS_QUEUE)
+ .withUrl(queueName)
.append("accessKey",
amazonProperties.getProperty(AWSConfigs.ACCESS_KEY))
.append("secretKey",
amazonProperties.getProperty(AWSConfigs.SECRET_KEY))
.append("protocol",
amazonProperties.getProperty(AWSConfigs.PROTOCOL))