This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit de9baccbaf5e04d657745949a7cb6641184ac19c Author: Andrea Cosentino <[email protected]> AuthorDate: Mon Mar 29 07:36:38 2021 +0200 Align to 3.9.0 --- tests/itests-aws-v1/pom.xml | 92 -------- .../aws/v1/clients/AWSSQSClient.java | 125 ---------- .../source/CamelAWSKinesisPropertyFactory.java | 84 ------- .../source/CamelSourceAWSKinesisITCase.java | 260 --------------------- .../kinesis/source/TestKinesisConfiguration.java | 39 ---- .../v1/s3/source/CamelAWSS3PropertyFactory.java | 88 ------- .../aws/v1/s3/source/CamelSourceAWSS3ITCase.java | 230 ------------------ .../aws/v1/s3/source/TestS3Configuration.java | 39 ---- .../v1/sns/sink/CamelAWSSNSPropertyFactory.java | 88 ------- .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java | 167 ------------- .../aws/v1/sns/sink/TestSNSConfiguration.java | 36 --- .../v1/sqs/sink/CamelAWSSQSPropertyFactory.java | 103 -------- .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java | 179 -------------- .../v1/sqs/source/CamelAWSSQSPropertyFactory.java | 98 -------- .../aws/v1/sqs/source/CamelSourceAWSSQSITCase.java | 157 ------------- .../kafkaconnector/aws/v1/s3/source/file0.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file1.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file2.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file3.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file4.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file5.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file6.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file7.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file8.test | 14 -- .../kafkaconnector/aws/v1/s3/source/file9.test | 14 -- tests/pom.xml | 1 - 26 files changed, 1926 deletions(-) diff --git a/tests/itests-aws-v1/pom.xml b/tests/itests-aws-v1/pom.xml deleted file mode 100644 index 1db9aff..0000000 --- a/tests/itests-aws-v1/pom.xml +++ /dev/null @@ -1,92 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <parent> - <groupId>org.apache.camel.kafkaconnector</groupId> - <artifactId>itests-parent</artifactId> - <version>0.9.0-SNAPSHOT</version> - <relativePath>../itests-parent/pom.xml</relativePath> - </parent> - - <modelVersion>4.0.0</modelVersion> - - <artifactId>itests-aws-v1</artifactId> - <name>Camel-Kafka-Connector :: Tests :: AWS v1</name> - - <dependencies> - <dependency> - <groupId>org.apache.camel.kafkaconnector</groupId> - <artifactId>itests-common</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-test-infra-aws-common</artifactId> - <version>${camel.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-test-infra-aws-v1</artifactId> - <version>${camel.version}</version> - <type>test-jar</type> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-aws-sqs</artifactId> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-aws-s3</artifactId> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-aws-sns</artifactId> - </dependency> - <dependency> - <groupId>org.apache.camel</groupId> - <artifactId>camel-aws-kinesis</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-failsafe-plugin</artifactId> - <configuration> - <!-- Properties needed for the test: - - com.amazonaws.sdk.disableCbor is used to ensure the AWS Kinesis reply is parseable - --> - <argLine>${common.failsafe.args} -Dcom.amazonaws.sdk.disableCbor=true</argLine> - <skipTests>${skipIntegrationTests}</skipTests> - </configuration> - </plugin> - </plugins> - </build> - - -</project> \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSSQSClient.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSSQSClient.java deleted file mode 100644 index 10a16cd..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/clients/AWSSQSClient.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.clients; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Predicate; - -import com.amazonaws.services.sqs.AmazonSQS; -import com.amazonaws.services.sqs.model.CreateQueueRequest; -import com.amazonaws.services.sqs.model.DeleteQueueResult; -import com.amazonaws.services.sqs.model.GetQueueUrlResult; -import com.amazonaws.services.sqs.model.Message; -import com.amazonaws.services.sqs.model.QueueDoesNotExistException; -import com.amazonaws.services.sqs.model.ReceiveMessageRequest; -import com.amazonaws.services.sqs.model.ReceiveMessageResult; -import com.amazonaws.services.sqs.model.SendMessageRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AWSSQSClient { - private static final Logger LOG = LoggerFactory.getLogger(AWSSQSClient.class); - - private final AmazonSQS sqs; - private int maxWaitTime = 10; - private int maxNumberOfMessages = 1; - - public AWSSQSClient(AmazonSQS sqs) { - this.sqs = sqs; - } - - public String createQueue(String queue) { - final Map<String, String> queueAttributes = new HashMap<>(); - - final CreateQueueRequest createFifoQueueRequest = new CreateQueueRequest(queue).withAttributes(queueAttributes); - - return sqs.createQueue(createFifoQueueRequest).getQueueUrl(); - } - - public synchronized String getQueue(String queue) { - try { - GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(queue); - - return getQueueUrlResult.getQueueUrl(); - } catch (QueueDoesNotExistException e) { - return createQueue(queue); - } - } - - public void receive(String queue, Predicate<List<Message>> predicate) { - final String queueUrl = getQueue(queue); - - receiveFrom(queueUrl, predicate); - } - - public void receiveFrom(String queueUrl, Predicate<List<Message>> predicate) { - LOG.debug("Consuming messages from {}", queueUrl); - - final ReceiveMessageRequest request = new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(maxWaitTime) - .withMaxNumberOfMessages(maxNumberOfMessages); - - while (true) { - ReceiveMessageResult result = sqs.receiveMessage(request); - - List<Message> messages = result.getMessages(); - - if (!predicate.test(messages)) { - return; - } - } - } - - public void send(String queue, String body) { - final String queueUrl = getQueue(queue); - - sendTo(queueUrl, body); - } - - public void sendTo(String queueUrl, String body) { - LOG.debug("Sending messages to {}", queueUrl); - - SendMessageRequest request = new SendMessageRequest().withQueueUrl(queueUrl).withMessageBody(body); - - sqs.sendMessage(request); - } - - public boolean deleteQueue(String queue) { - GetQueueUrlResult getQueueUrlResult = sqs.getQueueUrl(queue); - - if (getQueueUrlResult.getSdkHttpMetadata().getHttpStatusCode() == 404) { - return true; - } else { - if (getQueueUrlResult.getSdkHttpMetadata().getHttpStatusCode() != 200) { - LOG.warn("Unable to get queue {} for deletion", queue); - - return false; - } - } - - DeleteQueueResult result = sqs.deleteQueue(getQueueUrlResult.getQueueUrl()); - - if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) { - LOG.warn("Unable to delete queue {}", queue); - return false; - } - - return true; - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelAWSKinesisPropertyFactory.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelAWSKinesisPropertyFactory.java deleted file mode 100644 index 014c706..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelAWSKinesisPropertyFactory.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.kinesis.source; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import com.amazonaws.regions.Regions; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; -import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; -import org.apache.camel.test.infra.aws.common.AWSConfigs; - -/** - * Creates the set of properties used by a Camel Kinesis Source Connector - */ -final class CamelAWSKinesisPropertyFactory extends SourceConnectorPropertyFactory<CamelAWSKinesisPropertyFactory> { - public static final Map<String, String> SPRING_STYLE = new HashMap<>(); - public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); - - static { - SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-kinesis.accessKey"); - SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-kinesis.secretKey"); - SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws-kinesis.region"); - - KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-kinesis.access-key"); - KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-kinesis.secret-key"); - KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws-kinesis.region"); - } - - private CamelAWSKinesisPropertyFactory() { - - } - - public CamelAWSKinesisPropertyFactory withAmazonConfig(Properties amazonConfigs) { - return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); - } - - public CamelAWSKinesisPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { - String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY); - String secretKeyKey = style.get(AWSConfigs.SECRET_KEY); - String regionKey = style.get(AWSConfigs.REGION); - - setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "")); - setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "")); - return setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())); - } - - public CamelAWSKinesisPropertyFactory withStreamName(String streamName) { - return setProperty("camel.source.path.streamName", streamName); - } - - public EndpointUrlBuilder<CamelAWSKinesisPropertyFactory> withUrl(String streamName) { - String sourceUrl = String.format("aws-kinesis://%s", streamName); - - return new EndpointUrlBuilder<>(this::withSourceUrl, sourceUrl); - } - - public CamelAWSKinesisPropertyFactory withConfiguration(String configurationClass) { - return setProperty("camel.component.aws-kinesis.configuration", classRef(configurationClass)); - } - - public static CamelAWSKinesisPropertyFactory basic() { - return new CamelAWSKinesisPropertyFactory().withName("CamelAwskinesisSourceConnector").withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.awskinesis.CamelAwskinesisSourceConnector") - .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java deleted file mode 100644 index da6ab70..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.kinesis.source; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; - -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.kinesis.AmazonKinesis; -import com.amazonaws.services.kinesis.model.CreateStreamResult; -import com.amazonaws.services.kinesis.model.DeleteStreamResult; -import com.amazonaws.services.kinesis.model.DescribeStreamResult; -import com.amazonaws.services.kinesis.model.PutRecordsRequest; -import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; -import com.amazonaws.services.kinesis.model.PutRecordsResult; -import com.amazonaws.services.kinesis.model.ResourceInUseException; -import com.amazonaws.services.kinesis.model.ResourceNotFoundException; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; -import org.apache.camel.test.infra.aws.common.AWSCommon; -import org.apache.camel.test.infra.aws.common.services.AWSService; -import org.apache.camel.test.infra.aws.services.AWSServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest { - - @RegisterExtension - public static AWSService service = AWSServiceFactory.createKinesisService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class); - - private String streamName; - private AmazonKinesis awsKinesisClient; - private volatile int received; - private final int expect = 10; - - @Override - protected String[] getConnectorsInTest() { - return new String[] {"camel-aws-kinesis-kafka-connector"}; - } - - private void doCreateStream() { - CreateStreamResult result = awsKinesisClient.createStream(streamName, 1); - if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) { - fail("Failed to create the stream"); - } else { - LOG.info("Stream created successfully"); - } - } - - private void createStream() { - try { - LOG.info("Checking whether the stream exists already"); - DescribeStreamResult describeStreamResult = awsKinesisClient.describeStream(streamName); - - int status = describeStreamResult.getSdkHttpMetadata().getHttpStatusCode(); - LOG.info("Kinesis stream check result: {}", status); - } catch (ResourceNotFoundException e) { - LOG.info("The stream does not exist, auto creating it ..."); - doCreateStream(); - } - } - - private void doDeleteStream() { - DeleteStreamResult result = awsKinesisClient.deleteStream(streamName); - - if (result.getSdkHttpMetadata().getHttpStatusCode() != 200) { - fail("Failed to delete the stream"); - } else { - LOG.info("Stream deleted successfully"); - } - } - - private void deleteStream() { - try { - LOG.info("Checking whether the stream exists already"); - DescribeStreamResult describeStreamResult = awsKinesisClient.describeStream(streamName); - - int status = describeStreamResult.getSdkHttpMetadata().getHttpStatusCode(); - LOG.info("Kinesis stream check result: {}", status); - doDeleteStream(); - } catch (ResourceNotFoundException e) { - LOG.info("The stream does not exist, skipping deletion"); - } catch (ResourceInUseException e) { - LOG.info("The stream exist but cannot be deleted because it's in use"); - doDeleteStream(); - } - } - - @BeforeEach - public void setUp() { - streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + TestUtils.randomWithRange(0, 100); - - awsKinesisClient = AWSClientUtils.newKinesisClient(); - received = 0; - - createStream(); - } - - @AfterEach - public void tearDown() { - deleteStream(); - - awsKinesisClient.shutdown(); - } - - private boolean checkRecord(ConsumerRecord<String, String> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; - } - - public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) - throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - putRecords(); - LOG.debug("Initialized the connector and put the data for the test execution"); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - - assertEquals(received, expect, "Didn't process the expected amount of messages"); - } - - @Test - @Timeout(120) - public void testBasicSendReceive() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withAmazonConfig(service.getConnectionProperties()) - .withConfiguration(TestKinesisConfiguration.class.getName()).withStreamName(streamName); - - runtTest(connectorPropertyFactory); - } - - @Test - @Timeout(120) - public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withAmazonConfig(service.getConnectionProperties(), CamelAWSKinesisPropertyFactory.KAFKA_STYLE) - .withConfiguration(TestKinesisConfiguration.class.getName()).withStreamName(streamName); - - runtTest(connectorPropertyFactory); - } - - @Test - @Timeout(120) - public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSKinesisPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withAmazonConfig(service.getConnectionProperties()) - .withConfiguration(TestKinesisConfiguration.class.getName()).withUrl(streamName).buildUrl(); - - runtTest(connectorPropertyFactory); - } - - private void putRecords() { - PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); - putRecordsRequest.setStreamName(streamName); - - List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); - - LOG.debug("Adding data to the Kinesis stream"); - for (int i = 0; i < expect; i++) { - PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); - putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); - - String partition = String.format("partitionKey-%d", i); - putRecordsRequestEntry.setPartitionKey(partition); - - LOG.debug("Added data {} (as bytes) to partition {}", i, partition); - - putRecordsRequestEntryList.add(putRecordsRequestEntry); - } - - LOG.debug("Done creating the data records"); - - int retries = 5; - do { - try { - putRecordsRequest.setRecords(putRecordsRequestEntryList); - PutRecordsResult putRecordsResult = awsKinesisClient.putRecords(putRecordsRequest); - - if (putRecordsResult.getFailedRecordCount() == 0) { - LOG.debug("Done putting the data records into the stream"); - } else { - fail("Unable to put all the records into the stream"); - } - - break; - } catch (AmazonServiceException e) { - retries--; - - /* - * This works around the "... Cannot deserialize instance of `...AmazonKinesisException` out of - * NOT_AVAILABLE token - * - * It may take some time for the local Kinesis backend to be fully up - even though the container is - * reportedly up and running. Therefore, it tries a few more times - */ - LOG.trace("Failed to put the records: {}. Retrying in 2 seconds ...", e.getMessage()); - if (retries == 0) { - LOG.error("Failed to put the records: {}", e.getMessage(), e); - throw e; - } - - try { - Thread.sleep(TimeUnit.SECONDS.toMillis(2)); - } catch (InterruptedException ex) { - break; - } - } - } while (retries > 0); - - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/TestKinesisConfiguration.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/TestKinesisConfiguration.java deleted file mode 100644 index 6d84f4c..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/TestKinesisConfiguration.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.kinesis.source; - -import com.amazonaws.services.kinesis.AmazonKinesis; -import org.apache.camel.component.aws.kinesis.KinesisConfiguration; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; - -public class TestKinesisConfiguration extends KinesisConfiguration { - private AmazonKinesis amazonKinesis; - - private AmazonKinesis buildClient() { - return AWSClientUtils.newKinesisClient(); - } - - @Override - public AmazonKinesis getAmazonKinesisClient() { - if (amazonKinesis == null) { - amazonKinesis = buildClient(); - } - - return amazonKinesis; - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelAWSS3PropertyFactory.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelAWSS3PropertyFactory.java deleted file mode 100644 index e45cffe..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelAWSS3PropertyFactory.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.s3.source; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import com.amazonaws.regions.Regions; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; -import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; -import org.apache.camel.test.infra.aws.common.AWSConfigs; - -/** - * Creates the set of properties used by a Camel JMS Sink Connector - */ -final class CamelAWSS3PropertyFactory extends SourceConnectorPropertyFactory<CamelAWSS3PropertyFactory> { - public static final Map<String, String> SPRING_STYLE = new HashMap<>(); - public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); - - static { - SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-s3.accessKey"); - SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-s3.secretKey"); - SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws-s3.region"); - - KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-s3.access-key"); - KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-s3.secret-key"); - KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws-s3.region"); - } - - private CamelAWSS3PropertyFactory() { - - } - - public CamelAWSS3PropertyFactory withAmazonConfig(Properties amazonConfigs) { - return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); - } - - public CamelAWSS3PropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { - String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY); - String secretKeyKey = style.get(AWSConfigs.SECRET_KEY); - String regionKey = style.get(AWSConfigs.REGION); - - setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "")); - setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "")); - return setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())); - } - - public EndpointUrlBuilder<CamelAWSS3PropertyFactory> withUrl(String bucket) { - String queueUrl = String.format("aws-s3://%s", bucket); - - return new EndpointUrlBuilder<>(this::withSourceUrl, queueUrl); - } - - public CamelAWSS3PropertyFactory withMaxMessagesPerPoll(int value) { - return setProperty("camel.source.endpoint.maxMessagesPerPoll", Integer.toString(value)); - } - - public CamelAWSS3PropertyFactory withBucketNameOrArn(String bucketNameOrArn) { - return setProperty("camel.source.path.bucketNameOrArn", bucketNameOrArn); - } - - public CamelAWSS3PropertyFactory withConfiguration(String configurationClass) { - return setProperty("camel.component.aws-s3.configuration", classRef(configurationClass)); - } - - public static CamelAWSS3PropertyFactory basic() { - return new CamelAWSS3PropertyFactory().withName("CamelAwss3SourceConnector").withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector") - .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java deleted file mode 100644 index 87fa9ad..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.s3.source; - -import java.io.File; -import java.util.Iterator; -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import com.amazonaws.regions.Regions; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.ListVersionsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.amazonaws.services.s3.model.S3VersionSummary; -import com.amazonaws.services.s3.model.VersionListing; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; -import org.apache.camel.test.infra.aws.common.AWSCommon; -import org.apache.camel.test.infra.aws.common.AWSConfigs; -import org.apache.camel.test.infra.aws.common.services.AWSService; -import org.apache.camel.test.infra.aws.services.AWSServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSourceAWSS3ITCase extends AbstractKafkaTest { - - @RegisterExtension - public static AWSService service = AWSServiceFactory.createS3Service(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSS3ITCase.class); - - private AmazonS3 awsS3Client; - private volatile int received; - private final int expect = 10; - - /** - * Delete an S3 bucket using the provided client. Coming from AWS documentation: - * https://docs.aws.amazon.com/AmazonS3/latest/dev/delete-or-empty-bucket.html#delete-bucket-sdk-java - * - * @param s3Client - * the AmazonS3 client instance used to delete the bucket - * @param bucketName - * a String containing the bucket name - */ - public static void deleteBucket(AmazonS3 s3Client, String bucketName) { - // Delete all objects from the bucket. This is sufficient - // for non versioned buckets. For versioned buckets, when you attempt to delete objects, Amazon S3 inserts - // delete markers for all objects, but doesn't delete the object versions. - // To delete objects from versioned buckets, delete all of the object versions before deleting - // the bucket (see below for an example). - ObjectListing objectListing = s3Client.listObjects(bucketName); - while (true) { - Iterator<S3ObjectSummary> objIter = objectListing.getObjectSummaries().iterator(); - while (objIter.hasNext()) { - s3Client.deleteObject(bucketName, objIter.next().getKey()); - } - - // If the bucket contains many objects, the listObjects() call - // might not return all of the objects in the first listing. Check to - // see whether the listing was truncated. If so, retrieve the next page of objects - // and delete them. - if (objectListing.isTruncated()) { - objectListing = s3Client.listNextBatchOfObjects(objectListing); - } else { - break; - } - } - - // Delete all object versions (required for versioned buckets). - VersionListing versionList = s3Client.listVersions(new ListVersionsRequest().withBucketName(bucketName)); - while (true) { - Iterator<S3VersionSummary> versionIter = versionList.getVersionSummaries().iterator(); - while (versionIter.hasNext()) { - S3VersionSummary vs = versionIter.next(); - s3Client.deleteVersion(bucketName, vs.getKey(), vs.getVersionId()); - } - - if (versionList.isTruncated()) { - versionList = s3Client.listNextBatchOfVersions(versionList); - } else { - break; - } - } - - // After all objects and object versions are deleted, delete the bucket. - s3Client.deleteBucket(bucketName); - } - - @Override - protected String[] getConnectorsInTest() { - return new String[] {"camel-aws-s3-kafka-connector"}; - } - - @BeforeEach - public void setUp() { - awsS3Client = AWSClientUtils.newS3Client(); - received = 0; - - try { - awsS3Client.createBucket(AWSCommon.DEFAULT_S3_BUCKET); - } catch (Exception e) { - LOG.error("Unable to create bucket: {}", e.getMessage(), e); - fail("Unable to create bucket"); - } - } - - @AfterEach - public void tearDown() { - try { - deleteBucket(awsS3Client, AWSCommon.DEFAULT_S3_BUCKET); - } catch (Exception e) { - LOG.warn("Unable to delete bucked: {}", e.getMessage(), e); - } - } - - private boolean checkRecord(ConsumerRecord<String, String> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; - } - - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) - throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - LOG.debug("Putting S3 objects"); - for (int i = 0; i < expect; i++) { - String name = "file" + i + ".test"; - String file = this.getClass().getResource(name).getFile(); - - LOG.trace("Putting file {}", file); - awsS3Client.putObject(AWSCommon.DEFAULT_S3_BUCKET, name, new File(file)); - } - LOG.debug("Done putting S3S objects"); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - - assertEquals(received, expect, "Didn't process the expected amount of messages"); - } - - @Test - @Timeout(180) - public void testBasicSendReceive() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withConfiguration(TestS3Configuration.class.getName()).withBucketNameOrArn(AWSCommon.DEFAULT_S3_BUCKET) - .withAmazonConfig(service.getConnectionProperties()); - - runTest(connectorPropertyFactory); - } - - @Test - @Timeout(180) - public void testBasicSendReceiveWithMaxMessagesPerPoll() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withConfiguration(TestS3Configuration.class.getName()).withMaxMessagesPerPoll(5) - .withBucketNameOrArn(AWSCommon.DEFAULT_S3_BUCKET).withAmazonConfig(service.getConnectionProperties()); - - runTest(connectorPropertyFactory); - } - - @Test - @Timeout(180) - public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withConfiguration(TestS3Configuration.class.getName()).withBucketNameOrArn(AWSCommon.DEFAULT_S3_BUCKET) - .withAmazonConfig(service.getConnectionProperties(), CamelAWSS3PropertyFactory.KAFKA_STYLE); - - runTest(connectorPropertyFactory); - } - - @Test - @Timeout(180) - public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException { - Properties amazonProperties = service.getConnectionProperties(); - - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSS3PropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())) - .withConfiguration(TestS3Configuration.class.getName()).withUrl(AWSCommon.DEFAULT_S3_BUCKET) - .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) - .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) - .appendIfAvailable("proxyProtocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL)) - .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())).buildUrl(); - - runTest(connectorPropertyFactory); - } - -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/TestS3Configuration.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/TestS3Configuration.java deleted file mode 100644 index a0ae33e..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/TestS3Configuration.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.s3.source; - -import com.amazonaws.services.s3.AmazonS3; -import org.apache.camel.component.aws.s3.S3Configuration; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; - -public class TestS3Configuration extends S3Configuration { - private AmazonS3 amazonS3; - - private AmazonS3 buildClient() { - return AWSClientUtils.newS3Client(); - } - - @Override - public AmazonS3 getAmazonS3Client() { - if (amazonS3 == null) { - amazonS3 = buildClient(); - } - - return amazonS3; - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelAWSSNSPropertyFactory.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelAWSSNSPropertyFactory.java deleted file mode 100644 index 7bab323..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelAWSSNSPropertyFactory.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.sns.sink; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import com.amazonaws.regions.Regions; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; -import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; -import org.apache.camel.test.infra.aws.common.AWSConfigs; - -/** - * Creates the set of properties used by a Camel JMS Sink Connector - */ -final class CamelAWSSNSPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSSNSPropertyFactory> { - public static final Map<String, String> SPRING_STYLE = new HashMap<>(); - public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); - - static { - SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-sns.accessKey"); - SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-sns.secretKey"); - SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws-sns.region"); - - KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-sns.access-key"); - KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-sns.secret-key"); - KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws-sns.region"); - } - - private CamelAWSSNSPropertyFactory() { - } - - public EndpointUrlBuilder<CamelAWSSNSPropertyFactory> withUrl(String topicOrArn) { - String sinkUrl = String.format("aws-sns:%s", topicOrArn); - - return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl); - } - - public CamelAWSSNSPropertyFactory withTopicOrArn(String topicOrArn) { - return setProperty("camel.sink.path.topicNameOrArn", topicOrArn); - } - - public CamelAWSSNSPropertyFactory withSubscribeSNStoSQS(String queue) { - return setProperty("camel.sink.endpoint.subscribeSNStoSQS", "true").setProperty("camel.sink.endpoint.queueUrl", - queue); - } - - public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs) { - return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); - } - - public CamelAWSSNSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { - String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY); - String secretKeyKey = style.get(AWSConfigs.SECRET_KEY); - String regionKey = style.get(AWSConfigs.REGION); - - setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "")); - setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "")); - return setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())); - } - - public CamelAWSSNSPropertyFactory withConfiguration(String configurationClass) { - return setProperty("camel.component.aws-sns.configuration", classRef(configurationClass)); - } - - public static CamelAWSSNSPropertyFactory basic() { - return new CamelAWSSNSPropertyFactory().withName("CamelAWSSNSSinkConnector").withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.awssns.CamelAwssnsSinkConnector") - .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java deleted file mode 100644 index aea8c76..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.sns.sink; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.model.Message; -import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; -import org.apache.camel.test.infra.aws.common.AWSCommon; -import org.apache.camel.test.infra.aws.common.AWSConfigs; -import org.apache.camel.test.infra.aws.common.services.AWSService; -import org.apache.camel.test.infra.aws.services.AWSServiceFactory; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport { - - @RegisterExtension - public static AWSService service = AWSServiceFactory.createSNSService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSNSITCase.class); - - private AWSSQSClient awsSqsClient; - private String sqsQueueUrl; - private String queueName; - private String topicName; - - private volatile int received; - private final int expect = 10; - - @Override - protected String[] getConnectorsInTest() { - return new String[] {"camel-aws-sns-kafka-connector"}; - } - - @BeforeEach - public void setUp() { - topicName = getTopicForTest(this); - - awsSqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); - queueName = AWSCommon.DEFAULT_SQS_QUEUE_FOR_SNS + "-" + TestUtils.randomWithRange(0, 1000); - sqsQueueUrl = awsSqsClient.getQueue(queueName); - - LOG.info("Created SQS queue {}", sqsQueueUrl); - received = 0; - } - - @Override - protected void verifyMessages(CountDownLatch latch) throws InterruptedException { - if (latch.await(120, TimeUnit.SECONDS)) { - assertEquals(expect, received, - "Didn't process the expected amount of messages: " + received + " != " + expect); - } else { - fail("Failed to receive the messages within the specified time"); - } - } - - private boolean checkMessages(List<Message> messages) { - for (Message message : messages) { - LOG.info("Received: {}", message.getBody()); - - received++; - } - - if (received == expect) { - return false; - } - - return true; - } - - @Override - protected void consumeMessages(CountDownLatch latch) { - try { - awsSqsClient.receiveFrom(sqsQueueUrl, this::checkMessages); - } catch (Throwable t) { - LOG.error("Failed to consume messages: {}", t.getMessage(), t); - fail(t.getMessage()); - } finally { - latch.countDown(); - } - } - - @Test - @Timeout(value = 90) - public void testBasicSendReceive() throws Exception { - Properties amazonProperties = service.getConnectionProperties(); - - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() - .withName("CamelAWSSNSSinkConnectorDefault") - .withTopics(topicName) - .withTopicOrArn(queueName) - .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName()) - .withAmazonConfig(amazonProperties); - - runTest(connectorPropertyFactory, topicName, expect); - } - - @Test - @Timeout(value = 90) - public void testBasicSendReceiveUsingKafkaStyle() throws Exception { - Properties amazonProperties = service.getConnectionProperties(); - - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() - .withName("CamelAWSSNSSinkKafkaStyleConnector") - .withTopics(topicName) - .withTopicOrArn(queueName) - .withSubscribeSNStoSQS(sqsQueueUrl).withConfiguration(TestSNSConfiguration.class.getName()) - .withAmazonConfig(amazonProperties, CamelAWSSNSPropertyFactory.KAFKA_STYLE); - - runTest(connectorPropertyFactory, topicName, expect); - } - - @Disabled("AWS SNS component is failing to parse the sink URL for this one") - @Test - @Timeout(value = 90) - public void testBasicSendReceiveUsingUrl() throws Exception { - Properties amazonProperties = service.getConnectionProperties(); - - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSNSPropertyFactory.basic() - .withName("CamelAWSSNSSinkKafkaStyleConnector") - .withTopics(topicName) - .withUrl(queueName) - .append("queueUrl", sqsQueueUrl).append("subscribeSNStoSQS", "true") - .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) - .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) - .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())) - .append("configuration", "#class:" + TestSNSConfiguration.class.getName()) - .buildUrl(); - - runTest(connectorPropertyFactory, topicName, expect); - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/TestSNSConfiguration.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/TestSNSConfiguration.java deleted file mode 100644 index 98866ee..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/TestSNSConfiguration.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.sns.sink; - -import com.amazonaws.services.sns.AmazonSNS; -import com.amazonaws.services.sqs.AmazonSQS; -import org.apache.camel.component.aws.sns.SnsConfiguration; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; - -public class TestSNSConfiguration extends SnsConfiguration { - - @Override - public AmazonSNS getAmazonSNSClient() { - return AWSClientUtils.newSNSClient(); - } - - @Override - public AmazonSQS getAmazonSQSClient() { - return AWSClientUtils.newSQSClient(); - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelAWSSQSPropertyFactory.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelAWSSQSPropertyFactory.java deleted file mode 100644 index ea5a7cf..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelAWSSQSPropertyFactory.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.sqs.sink; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import com.amazonaws.regions.Regions; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; -import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; -import org.apache.camel.test.infra.aws.common.AWSConfigs; - -/** - * Creates the set of properties used by a Camel JMS Sink Connector - */ -final class CamelAWSSQSPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSSQSPropertyFactory> { - public static final Map<String, String> SPRING_STYLE = new HashMap<>(); - public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); - - static { - SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-sqs.accessKey"); - SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-sqs.secretKey"); - SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws-sqs.region"); - SPRING_STYLE.put(AWSConfigs.PROTOCOL, "camel.sink.endpoint.protocol"); - SPRING_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.sink.endpoint.amazonAWSHost"); - - KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-sqs.access-key"); - KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-sqs.secret-key"); - KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws-sqs.region"); - KAFKA_STYLE.put(AWSConfigs.PROTOCOL, "camel.sink.endpoint.protocol"); - KAFKA_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.sink.endpoint.amazonAWSHost"); - } - - private CamelAWSSQSPropertyFactory() { - } - - public CamelAWSSQSPropertyFactory withAmazonConfig(Properties amazonConfigs) { - return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); - } - - public CamelAWSSQSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { - String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY); - String secretKeyKey = style.get(AWSConfigs.SECRET_KEY); - String regionKey = style.get(AWSConfigs.REGION); - String protocolKey = style.get(AWSConfigs.PROTOCOL); - String hostKey = style.get(AWSConfigs.AMAZON_AWS_HOST); - - setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "")); - setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "")); - setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())); - - String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL, ""); - - if (protocol != null && !protocol.isEmpty()) { - setProperty(protocolKey, protocol); - } - - String amazonAwsHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, ""); - if (amazonAwsHost != null && !amazonAwsHost.isEmpty()) { - setProperty(hostKey, amazonAwsHost); - } - - return this; - } - - public CamelAWSSQSPropertyFactory withConfiguration(String configurationClass) { - return setProperty("camel.component.aws-sqs.configuration", classRef(configurationClass)); - } - - public EndpointUrlBuilder<CamelAWSSQSPropertyFactory> withUrl(String queueNameOrArn) { - String queueUrl = String.format("aws-sqs://%s", queueNameOrArn); - - return new EndpointUrlBuilder<>(this::withSinkUrl, queueUrl); - } - - public CamelAWSSQSPropertyFactory withQueueNameOrArn(String queueNameOrArn) { - return setProperty("camel.sink.path.queueNameOrArn", queueNameOrArn); - } - - public static CamelAWSSQSPropertyFactory basic() { - return new CamelAWSSQSPropertyFactory().withName("CamelAwssqsSinkConnector").withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.awssqs.CamelAwssqsSinkConnector") - .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); - } - -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java deleted file mode 100644 index 894114f..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.sqs.sink; - -import java.util.List; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import com.amazonaws.regions.Regions; -import com.amazonaws.services.sqs.model.Message; -import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; -import org.apache.camel.test.infra.aws.common.AWSCommon; -import org.apache.camel.test.infra.aws.common.AWSConfigs; -import org.apache.camel.test.infra.aws.common.services.AWSService; -import org.apache.camel.test.infra.aws.services.AWSServiceFactory; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport { - - @RegisterExtension - public static AWSService awsService = AWSServiceFactory.createSQSService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class); - - private AWSSQSClient awssqsClient; - private String queueName; - private String queueUrl; - private String topicName; - - private volatile int received; - private final int expect = 10; - - @Override - protected String[] getConnectorsInTest() { - return new String[] {"camel-aws-sqs-kafka-connector"}; - } - - @BeforeEach - public void setUp() { - topicName = getTopicForTest(this); - awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); - - queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); - queueUrl = awssqsClient.getQueue(queueName); - - LOG.debug("Using queue {} for the test", queueUrl); - - received = 0; - } - - @AfterEach - public void tearDown() { - if (!awssqsClient.deleteQueue(queueName)) { - fail("Failed to delete queue"); - } - } - - @Override - protected void verifyMessages(CountDownLatch latch) throws InterruptedException { - if (latch.await(110, TimeUnit.SECONDS)) { - assertEquals(expect, received, - "Didn't process the expected amount of messages: " + received + " != " + expect); - } else { - fail(String.format("Failed to receive the messages within the specified time: received %d of %d", received, - expect)); - } - } - - private boolean checkMessages(List<Message> messages) { - for (Message message : messages) { - LOG.info("Received: {}", message.getBody()); - - received++; - } - - if (received == expect) { - return false; - } - - return true; - } - - @Override - protected void consumeMessages(CountDownLatch latch) { - try { - awssqsClient.receiveFrom(queueUrl, this::checkMessages); - } catch (Throwable t) { - LOG.error("Failed to consume messages: {}", t.getMessage(), t); - } finally { - latch.countDown(); - } - } - - @Test - @Timeout(value = 120) - public void testBasicSendReceive() throws Exception { - Properties amazonProperties = awsService.getConnectionProperties(); - - ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() - .withName("CamelAwssqsSinkConnectorSpringBootStyle") - .withTopics(topicName) - .withAmazonConfig(amazonProperties) - .withQueueNameOrArn(queueName); - - runTest(testProperties, topicName, expect); - - } - - @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") - @Timeout(value = 120) - @RepeatedTest(3) - public void testBasicSendReceiveUsingKafkaStyle() throws Exception { - Properties amazonProperties = awsService.getConnectionProperties(); - - ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() - .withName("CamelAwssqsSinkConnectorKafkaStyle") - .withTopics(topicName) - .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE) - .withQueueNameOrArn(queueName); - - runTest(testProperties, topicName, expect); - } - - @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") - @Timeout(value = 120) - @RepeatedTest(3) - public void testBasicSendReceiveUsingUrl() throws Exception { - Properties amazonProperties = awsService.getConnectionProperties(); - - ConnectorPropertyFactory testProperties = CamelAWSSQSPropertyFactory.basic() - .withName("CamelAwssqsSinkConnectorUsingUrl") - .withTopics(topicName) - .withUrl(queueName) - .append("autoCreateQueue", "true") - .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) - .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) - .append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL)) - .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())) - .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)) - .buildUrl(); - - runTest(testProperties, topicName, expect); - } - -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelAWSSQSPropertyFactory.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelAWSSQSPropertyFactory.java deleted file mode 100644 index 930dc1a..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelAWSSQSPropertyFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.sqs.source; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -import com.amazonaws.regions.Regions; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; -import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; -import org.apache.camel.test.infra.aws.common.AWSConfigs; - -/** - * Creates the set of properties used by a Camel JMS Sink Connector - */ -final class CamelAWSSQSPropertyFactory extends SourceConnectorPropertyFactory<CamelAWSSQSPropertyFactory> { - public static final Map<String, String> SPRING_STYLE = new HashMap<>(); - public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); - - static { - SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-sqs.accessKey"); - SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-sqs.secretKey"); - SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws-sqs.region"); - SPRING_STYLE.put(AWSConfigs.PROTOCOL, "camel.source.endpoint.protocol"); - SPRING_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.source.endpoint.amazonAWSHost"); - - KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws-sqs.access-key"); - KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws-sqs.secret-key"); - KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws-sqs.region"); - KAFKA_STYLE.put(AWSConfigs.PROTOCOL, "camel.source.endpoint.protocol"); - KAFKA_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.source.endpoint.amazonAWSHost"); - } - - private CamelAWSSQSPropertyFactory() { - - } - - public CamelAWSSQSPropertyFactory withAmazonConfig(Properties amazonConfigs) { - return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); - } - - public CamelAWSSQSPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { - String accessKeyKey = style.get(AWSConfigs.ACCESS_KEY); - String secretKeyKey = style.get(AWSConfigs.SECRET_KEY); - String regionKey = style.get(AWSConfigs.REGION); - String protocolKey = style.get(AWSConfigs.PROTOCOL); - String hostKey = style.get(AWSConfigs.AMAZON_AWS_HOST); - - setProperty(accessKeyKey, amazonConfigs.getProperty(AWSConfigs.ACCESS_KEY, "")); - setProperty(secretKeyKey, amazonConfigs.getProperty(AWSConfigs.SECRET_KEY, "")); - setProperty(regionKey, amazonConfigs.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())); - - String protocol = amazonConfigs.getProperty(AWSConfigs.PROTOCOL, ""); - if (protocol != null && !protocol.isEmpty()) { - setProperty(protocolKey, protocol); - } - - String amazonAwsHost = amazonConfigs.getProperty(AWSConfigs.AMAZON_AWS_HOST, ""); - if (amazonAwsHost != null && !amazonAwsHost.isEmpty()) { - setProperty(hostKey, amazonAwsHost); - } - - return this; - } - - public CamelAWSSQSPropertyFactory withQueueOrArn(String queueOrArn) { - return setProperty("camel.source.path.queueNameOrArn", queueOrArn); - } - - public EndpointUrlBuilder<CamelAWSSQSPropertyFactory> withUrl(String queueOrArn) { - String queueUrl = String.format("aws-sqs://%s", queueOrArn); - - return new EndpointUrlBuilder<>(this::withSourceUrl, queueUrl); - } - - public static CamelAWSSQSPropertyFactory basic() { - return new CamelAWSSQSPropertyFactory().withName("CamelAwssqsSourceConnector").withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.awssqs.CamelAwssqsSourceConnector") - .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); - } -} diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java deleted file mode 100644 index 608c12d..0000000 --- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.camel.kafkaconnector.aws.v1.sqs.source; - -import java.util.Properties; -import java.util.concurrent.ExecutionException; - -import com.amazonaws.regions.Regions; -import org.apache.camel.kafkaconnector.aws.v1.clients.AWSSQSClient; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; -import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; -import org.apache.camel.kafkaconnector.common.utils.TestUtils; -import org.apache.camel.test.infra.aws.clients.AWSClientUtils; -import org.apache.camel.test.infra.aws.common.AWSCommon; -import org.apache.camel.test.infra.aws.common.AWSConfigs; -import org.apache.camel.test.infra.aws.common.services.AWSService; -import org.apache.camel.test.infra.aws.services.AWSServiceFactory; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInstance; -import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; -import org.junit.jupiter.api.extension.RegisterExtension; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSourceAWSSQSITCase extends AbstractKafkaTest { - - @RegisterExtension - public static AWSService service = AWSServiceFactory.createSQSService(); - private static final Logger LOG = LoggerFactory.getLogger(CamelSourceAWSSQSITCase.class); - - - private AWSSQSClient awssqsClient; - private String queueName; - private String queueUrl; - - private volatile int received; - private final int expect = 10; - - @Override - protected String[] getConnectorsInTest() { - return new String[] {"camel-aws-sqs-kafka-connector"}; - } - - @BeforeEach - public void setUp() { - awssqsClient = new AWSSQSClient(AWSClientUtils.newSQSClient()); - queueName = AWSCommon.BASE_SQS_QUEUE_NAME + "-" + TestUtils.randomWithRange(0, 1000); - - queueUrl = awssqsClient.getQueue(queueName); - received = 0; - } - - @AfterEach - public void tearDown() { - if (!awssqsClient.deleteQueue(queueName)) { - fail("Failed to delete queue"); - } - } - - private boolean checkRecord(ConsumerRecord<String, String> record) { - LOG.debug("Received: {}", record.value()); - received++; - - if (received == expect) { - return false; - } - - return true; - } - - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) - throws ExecutionException, InterruptedException { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnector(connectorPropertyFactory); - - LOG.debug("Sending SQS messages"); - for (int i = 0; i < expect; i++) { - awssqsClient.sendTo(queueUrl, "Source test message " + i); - } - LOG.debug("Done sending SQS messages"); - - LOG.debug("Creating the consumer ..."); - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), this::checkRecord); - LOG.debug("Created the consumer ..."); - - assertEquals(received, expect, "Didn't process the expected amount of messages"); - } - - @Test - @Timeout(90) - public void testBasicSendReceive() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withQueueOrArn(queueName) - .withAmazonConfig(service.getConnectionProperties()); - - runTest(connectorPropertyFactory); - } - - // This test does not run remotely because SQS has a cool down period for - // creating and removing the SQS queue - @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") - @Test - @Timeout(90) - public void testBasicSendReceiveWithKafkaStyle() throws ExecutionException, InterruptedException { - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withQueueOrArn(queueName) - .withAmazonConfig(service.getConnectionProperties(), CamelAWSSQSPropertyFactory.KAFKA_STYLE); - - runTest(connectorPropertyFactory); - } - - // This test does not run remotely because SQS has a cool down period for - // creating and removing the SQS queue - @DisabledIfSystemProperty(named = "aws-service.instance.type", matches = "remote") - @Test - @Timeout(90) - public void testBasicSendReceiveUsingUrl() throws ExecutionException, InterruptedException { - Properties amazonProperties = service.getConnectionProperties(); - - ConnectorPropertyFactory connectorPropertyFactory = CamelAWSSQSPropertyFactory.basic() - .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass())).withUrl(queueName) - .append("accessKey", amazonProperties.getProperty(AWSConfigs.ACCESS_KEY)) - .append("secretKey", amazonProperties.getProperty(AWSConfigs.SECRET_KEY)) - .append("protocol", amazonProperties.getProperty(AWSConfigs.PROTOCOL)) - .appendIfAvailable("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)) - .append("region", amazonProperties.getProperty(AWSConfigs.REGION, Regions.US_EAST_1.name())).buildUrl(); - - runTest(connectorPropertyFactory); - } -} diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file0.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file0.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file0.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file1.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file1.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file1.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file2.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file2.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file2.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file3.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file3.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file3.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file4.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file4.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file4.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file5.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file5.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file5.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file6.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file6.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file6.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file7.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file7.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file7.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file8.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file8.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file8.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file9.test b/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file9.test deleted file mode 100644 index fc590f9..0000000 --- a/tests/itests-aws-v1/src/test/resources/org/apache/camel/kafkaconnector/aws/v1/s3/source/file9.test +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. \ No newline at end of file diff --git a/tests/pom.xml b/tests/pom.xml index 6da0d7c..5d1a6de 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -42,7 +42,6 @@ <module>itests-parent</module> <module>itests-common</module> <module>itests-common-http</module> - <module>itests-aws-v1</module> <module>itests-aws-v2</module> <module>itests-cassandra</module> <module>itests-elasticsearch</module>
