This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit c7a6b0f82acb257905ec844565edfd028ce9cd61 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri Jan 22 11:23:06 2021 +0100 Added new integration test for AWS 2 Cloud Watch --- tests/itests-aws-v2/pom.xml | 5 + .../aws/v2/common/CamelSinkAWSTestSupport.java | 18 ++- .../aws/v2/cw/sink/CamelAWSCWPropertyFactory.java | 73 ++++++++++ .../aws/v2/cw/sink/CamelSinkAWSCWITCase.java | 161 +++++++++++++++++++++ .../v2/cw/sink/TestCloudWatchConfiguration.java | 35 +++++ .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 21 ++- 6 files changed, 302 insertions(+), 11 deletions(-) diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml index cd573f3..caf3c28 100644 --- a/tests/itests-aws-v2/pom.xml +++ b/tests/itests-aws-v2/pom.xml @@ -67,6 +67,11 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-aws2-s3</artifactId> </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws2-cw</artifactId> + </dependency> </dependencies> <build> diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java index c42cb36..a66a474 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java @@ -17,6 +17,7 @@ package org.apache.camel.kafkaconnector.aws.v2.common; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -24,7 +25,6 @@ import java.util.concurrent.Executors; import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +33,21 @@ import static org.junit.jupiter.api.Assertions.fail; public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest { private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSTestSupport.class); + protected abstract Map<String, String> messageHeaders(String text, int current); - protected void produceMessages(int count) { + protected void produceMessages(String topicName, int count) { try { KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); for (int i = 0; i < count; i++) { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); + String message = "Sink test message " + i; + Map<String, String> headers = messageHeaders(message, i); + + if (headers == null) { + kafkaClient.produce(topicName, message); + } else { + kafkaClient.produce(topicName, message, headers); + } } } catch (Throwable t) { LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t); @@ -47,7 +55,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest { } } - public void runTest(ConnectorPropertyFactory connectorPropertyFactory, int count) throws Exception { + public void runTest(ConnectorPropertyFactory connectorPropertyFactory, String topic, int count) throws Exception { connectorPropertyFactory.log(); getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); @@ -58,7 +66,7 @@ public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest { service.submit(() -> consumeMessages(latch)); LOG.debug("Creating the producer and sending messages ..."); - produceMessages(count); + produceMessages(topic, count); LOG.debug("Waiting for the test to complete"); verifyMessages(latch); diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java new file mode 100644 index 0000000..cd7f638 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelAWSCWPropertyFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.cw.sink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; +import org.apache.camel.test.infra.aws.common.AWSConfigs; + +public class CamelAWSCWPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSCWPropertyFactory> { + public static final Map<String, String> SPRING_STYLE = new HashMap<>(); + public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); + + static { + SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.accessKey"); + SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secretKey"); + SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region"); + + KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-cw.access-key"); + KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-cw.secret-key"); + KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-cw.region"); + } + + public CamelAWSCWPropertyFactory withSinkPathNamespace(String value) { + return setProperty("camel.sink.path.namespace", value); + } + + public CamelAWSCWPropertyFactory withName(String value) { + return setProperty("camel.sink.endpoint.name", value); + } + + public CamelAWSCWPropertyFactory withConfiguration(String value) { + return setProperty("camel.component.aws2-cw.configuration", classRef(value)); + } + + public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs) { + return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); + } + + public CamelAWSCWPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { + AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this); + + return this; + } + + public static CamelAWSCWPropertyFactory basic() { + return new CamelAWSCWPropertyFactory() + .withTasksMax(1) + .withName("CamelAWSCWConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.aws2cw.CamelAws2cwSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java new file mode 100644 index 0000000..62c7122 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/CamelSinkAWSCWITCase.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.cw.sink; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.kafkaconnector.CamelSinkTask; +import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatch.model.Dimension; +import software.amazon.awssdk.services.cloudwatch.model.ListMetricsRequest; +import software.amazon.awssdk.services.cloudwatch.model.ListMetricsResponse; +import software.amazon.awssdk.services.cloudwatch.model.Metric; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") +public class CamelSinkAWSCWITCase extends CamelSinkAWSTestSupport { + + @RegisterExtension + public static AWSService awsService = AWSServiceFactory.createCloudWatchService(); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSCWITCase.class); + + private CloudWatchClient client; + private String namespace; + private String metricName = "test-metric"; + + private volatile int received; + private final int expect = 10; + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-aws2-cw-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + client = AWSSDKClientUtils.newCloudWatchClient(); + + namespace = "cw-" + TestUtils.randomWithRange(0, 1000); + LOG.debug("Using namespace {} for the test", namespace); + + received = 0; + } + + @Override + protected Map<String, String> messageHeaders(String text, int current) { + Map<String, String> headers = new HashMap<>(); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionName", + "test-dimension-" + current); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsCwMetricDimensionValue", String.valueOf(current)); + + return headers; + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + ListMetricsRequest request = ListMetricsRequest.builder() + .namespace(namespace) + .metricName(metricName) + .build(); + + while (true) { + ListMetricsResponse response = client.listMetrics(request); + + for (Metric metric : response.metrics()) { + LOG.info("Retrieved metric {}", metric.metricName()); + + for (Dimension dimension : metric.dimensions()) { + LOG.info("Dimension {} value: {}", dimension.name(), dimension.value()); + received++; + + if (received == expect) { + return; + } + } + } + + try { + Thread.sleep(Duration.ofSeconds(1).toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + break; + } + } + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(110, TimeUnit.SECONDS)) { + assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail(String.format("Failed to receive the messages within the specified time: received %d of %d", + received, expect)); + } + } + + + @Test + @Timeout(value = 120) + public void testBasicSendReceive() { + try { + Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory testProperties = CamelAWSCWPropertyFactory + .basic() + .withTopics(topicName) + .withConfiguration(TestCloudWatchConfiguration.class.getName()) + .withAmazonConfig(amazonProperties) + .withName(metricName) + .withSinkPathNamespace(namespace); + + runTest(testProperties, topicName, expect); + } catch (Exception e) { + LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); + fail(e.getMessage()); + } + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java new file mode 100644 index 0000000..e214289 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/cw/sink/TestCloudWatchConfiguration.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.cw.sink; + +import org.apache.camel.component.aws2.cw.Cw2Configuration; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; + +public class TestCloudWatchConfiguration extends Cw2Configuration { + private CloudWatchClient client; + + @Override + public CloudWatchClient getAmazonCwClient() { + if (client == null) { + client = AWSSDKClientUtils.newCloudWatchClient(); + } + + return client; + } +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java index 6cc9b79..5b17a70 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java @@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector.aws.v2.sqs.sink; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -88,6 +89,11 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport { } @Override + protected Map<String, String> messageHeaders(String text, int current) { + return null; + } + + @Override protected void verifyMessages(CountDownLatch latch) throws InterruptedException { if (latch.await(110, TimeUnit.SECONDS)) { assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect); @@ -127,15 +133,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport { public void testBasicSendReceive() { try { Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory .basic() .withName("CamelAwssqsSinkConnectorSpringBootStyle") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withTopics(topicName) .withAmazonConfig(amazonProperties) .withQueueNameOrArn(queueName); - runTest(testProperties, expect); + runTest(testProperties, topicName, expect); } catch (Exception e) { LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); fail(e.getMessage()); @@ -148,15 +155,16 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport { public void testBasicSendReceiveUsingKafkaStyle() { try { Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory .basic() .withName("CamelAwssqsSinkConnectorKafkaStyle") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withTopics(topicName) .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE) .withQueueNameOrArn(queueName); - runTest(testProperties, expect); + runTest(testProperties, topicName, expect); } catch (Exception e) { LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); @@ -170,11 +178,12 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport { public void testBasicSendReceiveUsingUrl() { try { Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory .basic() .withName("CamelAwssqsSinkConnectorUsingUrl") - .withTopics(TestUtils.getDefaultTestTopic(this.getClass())) + .withTopics(topicName) .withUrl(queueName) .append("autoCreateQueue", "true") .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) @@ -184,7 +193,7 @@ public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport { .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)) .buildUrl(); - runTest(testProperties, expect); + runTest(testProperties, topicName, expect); } catch (Exception e) { LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);