This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push:
new cce09f4 Added AWS v2 Kinesis sink test case
cce09f4 is described below
commit cce09f4d6265b1b2a2a369ba6098a3ddb4dd950e
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jan 26 12:26:02 2021 +0100
Added AWS v2 Kinesis sink test case
---
.../KinesisUtils.java} | 167 ++++++---------------
.../TestKinesisConfiguration.java | 2 +-
.../sink/CamelAWSKinesisPropertyFactory.java | 77 ++++++++++
.../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java | 148 ++++++++++++++++++
.../source/CamelSourceAWSKinesisITCase.java | 164 +-------------------
5 files changed, 276 insertions(+), 282 deletions(-)
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
similarity index 55%
copy from
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
copy to
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
index e294f3f..2e5a7d1 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
@@ -15,29 +15,12 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.common;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.camel.test.infra.aws.common.AWSCommon;
-import org.apache.camel.test.infra.aws.common.services.AWSService;
-import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
-import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
-import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
-import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -49,36 +32,27 @@ import
software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
+public final class KinesisUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(KinesisUtils.class);
- @RegisterExtension
- public static AWSService awsService =
AWSServiceFactory.createKinesisService();
- private static final Logger LOG =
LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
+ private KinesisUtils() {
- private String streamName;
- private KinesisClient kinesisClient;
-
- private volatile int received;
- private final int expect = 10;
-
- @Override
- protected String[] getConnectorsInTest() {
- return new String[] {"camel-aws2-kinesis-kafka-connector"};
}
- private void doCreateStream() {
+ private static void doCreateStream(KinesisClient kinesisClient, String
streamName) {
CreateStreamRequest request = CreateStreamRequest.builder()
.streamName(streamName)
.shardCount(1)
@@ -98,7 +72,7 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
}
}
- private void createStream() {
+ public static void createStream(KinesisClient kinesisClient, String
streamName) {
try {
LOG.info("Checking whether the stream exists already");
DescribeStreamRequest request = DescribeStreamRequest.builder()
@@ -110,12 +84,17 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
int status = response.sdkHttpResponse().statusCode();
LOG.info("Kinesis stream check result: {}", status);
} catch (KinesisException e) {
- LOG.info("The stream does not exist, auto creating it: {}",
e.getMessage(), e);
- doCreateStream();
+ if (LOG.isTraceEnabled()) {
+ LOG.info("The stream does not exist, auto creating it: {}",
e.getMessage(), e);
+ } else {
+ LOG.info("The stream does not exist, auto creating it: {}",
e.getMessage());
+ }
+
+ doCreateStream(kinesisClient, streamName);
}
}
- private void doDeleteStream() {
+ public static void doDeleteStream(KinesisClient kinesisClient, String
streamName) {
DeleteStreamRequest request = DeleteStreamRequest.builder()
.streamName(streamName)
.build();
@@ -129,7 +108,7 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
}
}
- private void deleteStream() {
+ public static void deleteStream(KinesisClient kinesisClient, String
streamName) {
try {
LOG.info("Checking whether the stream exists already");
@@ -142,49 +121,21 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
if (response.sdkHttpResponse().isSuccessful()) {
LOG.info("Kinesis stream check result");
- doDeleteStream();
+ doDeleteStream(kinesisClient, streamName);
}
} catch (ResourceNotFoundException e) {
LOG.info("The stream does not exist, skipping deletion");
} catch (ResourceInUseException e) {
LOG.info("The stream exist but cannot be deleted because it's in
use");
- doDeleteStream();
+ doDeleteStream(kinesisClient, streamName);
}
}
-
- @BeforeEach
- public void setUp() {
- streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" +
TestUtils.randomWithRange(0, 100);
-
- kinesisClient = AWSSDKClientUtils.newKinesisClient();
- received = 0;
-
- createStream();
- }
-
-
- @AfterEach
- public void tearDown() {
- deleteStream();
- }
-
- private boolean checkRecord(ConsumerRecord<String, String> record) {
- LOG.debug("Received: {}", record.value());
- received++;
-
- if (received == expect) {
- return false;
- }
-
- return true;
- }
-
- private void putRecords() {
+ public static void putRecords(KinesisClient kinesisClient, String
streamName, int count) {
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new
ArrayList<>();
LOG.debug("Adding data to the Kinesis stream");
- for (int i = 0; i < expect; i++) {
+ for (int i = 0; i < count; i++) {
String partition = String.format("partitionKey-%d", i);
PutRecordsRequestEntry putRecordsRequestEntry =
PutRecordsRequestEntry.builder()
@@ -231,7 +182,6 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
throw e;
}
-
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
} catch (InterruptedException ex) {
@@ -239,63 +189,32 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
}
}
} while (retries > 0);
-
-
}
- public void runtTest(ConnectorPropertyFactory connectorPropertyFactory)
throws ExecutionException, InterruptedException {
- connectorPropertyFactory.log();
- getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
- putRecords();
- LOG.debug("Initialized the connector and put the data for the test
execution");
+ public static GetRecordsRequest getGetRecordsRequest(KinesisClient
kinesisClient, String streamName) {
+ DescribeStreamRequest describeStreamRequest =
DescribeStreamRequest.builder()
+ .streamName(streamName)
+ .build();
+ List<Shard> shards = new ArrayList<>();
- LOG.debug("Creating the consumer ...");
- KafkaClient<String, String> kafkaClient = new
KafkaClient<>(getKafkaService().getBootstrapServers());
- kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()),
this::checkRecord);
- LOG.debug("Created the consumer ...");
+ DescribeStreamResponse streamRes;
+ do {
+ streamRes = kinesisClient.describeStream(describeStreamRequest);
+ shards.addAll(streamRes.streamDescription().shards());
+ } while (streamRes.streamDescription().hasMoreShards());
- assertEquals(received, expect, "Didn't process the expected amount of
messages");
- }
- @Test
- @Timeout(120)
- public void testBasicSendReceive() throws ExecutionException,
InterruptedException {
- ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSKinesisPropertyFactory
- .basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withAmazonConfig(awsService.getConnectionProperties())
- .withConfiguration(TestKinesisConfiguration.class.getName())
- .withStreamName(streamName);
-
- runtTest(connectorPropertyFactory);
- }
+ GetShardIteratorRequest iteratorRequest =
GetShardIteratorRequest.builder()
+ .streamName(streamName)
+ .shardId(shards.get(0).shardId())
+ .shardIteratorType("TRIM_HORIZON")
+ .build();
- @Test
- @Timeout(120)
- public void testBasicSendReceiveWithKafkaStyle() throws
ExecutionException, InterruptedException {
- ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSKinesisPropertyFactory
- .basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withAmazonConfig(awsService.getConnectionProperties(),
CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
- .withConfiguration(TestKinesisConfiguration.class.getName())
- .withStreamName(streamName);
-
- runtTest(connectorPropertyFactory);
- }
+ GetShardIteratorResponse iteratorResponse =
kinesisClient.getShardIterator(iteratorRequest);
- @Test
- @Timeout(120)
- public void testBasicSendReceiveUsingUrl() throws ExecutionException,
InterruptedException {
- ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSKinesisPropertyFactory
- .basic()
- .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
- .withAmazonConfig(awsService.getConnectionProperties())
- .withConfiguration(TestKinesisConfiguration.class.getName())
- .withUrl(streamName)
- .buildUrl();
-
- runtTest(connectorPropertyFactory);
+ return GetRecordsRequest
+ .builder()
+ .shardIterator(iteratorResponse.shardIterator())
+ .build();
}
-
}
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/TestKinesisConfiguration.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/TestKinesisConfiguration.java
similarity index 95%
rename from
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/TestKinesisConfiguration.java
rename to
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/TestKinesisConfiguration.java
index 20ef00a..ddacd30 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/TestKinesisConfiguration.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/TestKinesisConfiguration.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.common;
import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelAWSKinesisPropertyFactory.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelAWSKinesisPropertyFactory.java
new file mode 100644
index 0000000..fb250a1
--- /dev/null
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelAWSKinesisPropertyFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+
+/**
+ * Creates the set of properties used by a Camel Kinesis Source Connector
+ */
+final class CamelAWSKinesisPropertyFactory extends
SinkConnectorPropertyFactory<CamelAWSKinesisPropertyFactory> {
+ public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+ public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+ static {
+ SPRING_STYLE.put(AWSConfigs.ACCESS_KEY,
"camel.component.aws2-kinesis.accessKey");
+ SPRING_STYLE.put(AWSConfigs.SECRET_KEY,
"camel.component.aws2-kinesis.secretKey");
+ SPRING_STYLE.put(AWSConfigs.REGION,
"camel.component.aws2-kinesis.region");
+
+ KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY,
"camel.component.aws2-kinesis.access-key");
+ KAFKA_STYLE.put(AWSConfigs.SECRET_KEY,
"camel.component.aws2-kinesis.secret-key");
+ KAFKA_STYLE.put(AWSConfigs.REGION,
"camel.component.aws2-kinesis.region");
+ }
+
+ private CamelAWSKinesisPropertyFactory() {
+
+ }
+
+ public CamelAWSKinesisPropertyFactory withAmazonConfig(Properties
amazonConfigs) {
+ return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+ }
+
+ public CamelAWSKinesisPropertyFactory withAmazonConfig(Properties
amazonConfigs, Map<String, String> style) {
+ AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+ return this;
+ }
+
+ public CamelAWSKinesisPropertyFactory withStreamName(String streamName) {
+ return setProperty("camel.sink.path.streamName", streamName);
+ }
+
+ public CamelAWSKinesisPropertyFactory withConfiguration(String
configurationClass) {
+ return setProperty("camel.component.aws2-kinesis.configuration",
+ classRef(configurationClass));
+ }
+
+ public static CamelAWSKinesisPropertyFactory basic() {
+ return new CamelAWSKinesisPropertyFactory()
+ .withName("CamelAwsKinesisSinkConnector")
+ .withTasksMax(1)
+
.withConnectorClass("org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector")
+
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+ }
+}
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
new file mode 100644
index 0000000..8d9e6c6
--- /dev/null
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.sink;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils;
+import
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import static
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSKinesisITCase extends CamelSinkAWSTestSupport {
+ @RegisterExtension
+ public static AWSService awsService =
AWSServiceFactory.createKinesisService();
+ private static final Logger LOG =
LoggerFactory.getLogger(CamelSinkAWSKinesisITCase.class);
+
+ private String streamName;
+ private KinesisClient kinesisClient;
+
+ private volatile int received;
+ private final int expect = 10;
+
+ @Override
+ protected Map<String, String> messageHeaders(String text, int current) {
+ Map<String, String> headers = new HashMap<>();
+
+ headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX +
"CamelAwsKinesisPartitionKey",
+ "partition-" + current);
+
+ return headers;
+ }
+
+ @Override
+ protected void consumeMessages(CountDownLatch latch) {
+ try {
+ GetRecordsRequest getRecordsRequest =
KinesisUtils.getGetRecordsRequest(kinesisClient, streamName);
+
+ while (true) {
+ GetRecordsResponse response =
kinesisClient.getRecords(getRecordsRequest);
+
+ List<Record> recordList = response.records();
+ received = recordList.size();
+ for (Record record : recordList) {
+ LOG.info("Received record: {}", record.data());
+
+ if (received >= expect) {
+ return;
+ }
+ }
+
+ if (!waitForData()) {
+ return;
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Error consuming records: {}", e.getMessage(), e);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+
+
+ @Override
+ protected void verifyMessages(CountDownLatch latch) throws
InterruptedException {
+ if (latch.await(110, TimeUnit.SECONDS)) {
+ assertEquals(expect, received, "Didn't process the expected amount
of messages: " + received + " != " + expect);
+ } else {
+ fail(String.format("Failed to receive the messages within the
specified time: received %d of %d",
+ received, expect));
+ }
+ }
+
+ @Override
+ protected String[] getConnectorsInTest() {
+ return new String[] {"camel-aws2-kinesis-kafka-connector"};
+ }
+
+ @BeforeEach
+ public void setUp() {
+ streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" +
TestUtils.randomWithRange(0, 100);
+
+ kinesisClient = AWSSDKClientUtils.newKinesisClient();
+ received = 0;
+
+ createStream(kinesisClient, streamName);
+ }
+
+ @Test
+ @Timeout(120)
+ public void testBasicSendReceive() throws Exception {
+ Properties amazonProperties = awsService.getConnectionProperties();
+ String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+ ConnectorPropertyFactory connectorPropertyFactory =
CamelAWSKinesisPropertyFactory
+ .basic()
+ .withTopics(topicName)
+ .withAmazonConfig(amazonProperties)
+ .withConfiguration(TestKinesisConfiguration.class.getName())
+ .withStreamName(streamName);
+
+ runTest(connectorPropertyFactory, topicName, expect);
+ }
+}
diff --git
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
index e294f3f..e19d8bd 100644
---
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -17,11 +17,9 @@
package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
-import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -40,24 +38,12 @@ import
org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.awscore.exception.AwsServiceException;
-import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.KinesisException;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
-import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
-import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import static
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream;
+import static
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.deleteStream;
+import static
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.putRecords;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
@@ -78,81 +64,6 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
return new String[] {"camel-aws2-kinesis-kafka-connector"};
}
- private void doCreateStream() {
- CreateStreamRequest request = CreateStreamRequest.builder()
- .streamName(streamName)
- .shardCount(1)
- .build();
-
- try {
- CreateStreamResponse response =
kinesisClient.createStream(request);
-
- if (response.sdkHttpResponse().isSuccessful()) {
- LOG.info("Stream created successfully");
- } else {
- fail("Failed to create the stream");
- }
- } catch (KinesisException e) {
- LOG.error("Unable to create stream: {}", e.getMessage(), e);
- fail("Unable to create stream");
- }
- }
-
- private void createStream() {
- try {
- LOG.info("Checking whether the stream exists already");
- DescribeStreamRequest request = DescribeStreamRequest.builder()
- .streamName(streamName)
- .build();
-
- DescribeStreamResponse response =
kinesisClient.describeStream(request);
-
- int status = response.sdkHttpResponse().statusCode();
- LOG.info("Kinesis stream check result: {}", status);
- } catch (KinesisException e) {
- LOG.info("The stream does not exist, auto creating it: {}",
e.getMessage(), e);
- doCreateStream();
- }
- }
-
- private void doDeleteStream() {
- DeleteStreamRequest request = DeleteStreamRequest.builder()
- .streamName(streamName)
- .build();
-
- DeleteStreamResponse response = kinesisClient.deleteStream(request);
-
- if (response.sdkHttpResponse().isSuccessful()) {
- LOG.info("Stream deleted successfully");
- } else {
- fail("Failed to delete the stream");
- }
- }
-
- private void deleteStream() {
- try {
- LOG.info("Checking whether the stream exists already");
-
-
- DescribeStreamRequest request = DescribeStreamRequest.builder()
- .streamName(streamName)
- .build();
-
- DescribeStreamResponse response =
kinesisClient.describeStream(request);
-
- if (response.sdkHttpResponse().isSuccessful()) {
- LOG.info("Kinesis stream check result");
- doDeleteStream();
- }
- } catch (ResourceNotFoundException e) {
- LOG.info("The stream does not exist, skipping deletion");
- } catch (ResourceInUseException e) {
- LOG.info("The stream exist but cannot be deleted because it's in
use");
- doDeleteStream();
- }
- }
-
-
@BeforeEach
public void setUp() {
streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" +
TestUtils.randomWithRange(0, 100);
@@ -160,13 +71,13 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
kinesisClient = AWSSDKClientUtils.newKinesisClient();
received = 0;
- createStream();
+ createStream(kinesisClient, streamName);
}
@AfterEach
public void tearDown() {
- deleteStream();
+ deleteStream(kinesisClient, streamName);
}
private boolean checkRecord(ConsumerRecord<String, String> record) {
@@ -180,74 +91,13 @@ public class CamelSourceAWSKinesisITCase extends
AbstractKafkaTest {
return true;
}
- private void putRecords() {
- List<PutRecordsRequestEntry> putRecordsRequestEntryList = new
ArrayList<>();
-
- LOG.debug("Adding data to the Kinesis stream");
- for (int i = 0; i < expect; i++) {
- String partition = String.format("partitionKey-%d", i);
-
- PutRecordsRequestEntry putRecordsRequestEntry =
PutRecordsRequestEntry.builder()
- .data(SdkBytes.fromByteArray(String.valueOf(i).getBytes()))
- .partitionKey(partition)
- .build();
-
- LOG.debug("Added data {} (as bytes) to partition {}", i,
partition);
- putRecordsRequestEntryList.add(putRecordsRequestEntry);
- }
-
- LOG.debug("Done creating the data records");
-
- PutRecordsRequest putRecordsRequest = PutRecordsRequest
- .builder()
- .streamName(streamName)
- .records(putRecordsRequestEntryList)
- .build();
- int retries = 5;
- do {
- try {
- PutRecordsResponse response =
kinesisClient.putRecords(putRecordsRequest);
-
- if (response.sdkHttpResponse().isSuccessful()) {
- LOG.debug("Done putting the data records into the stream");
- } else {
- fail("Unable to put all the records into the stream");
- }
-
- break;
- } catch (AwsServiceException e) {
- retries--;
-
- /*
- This works around the "... Cannot deserialize instance of
`...AmazonKinesisException` out of NOT_AVAILABLE token
-
- It may take some time for the local Kinesis backend to be
fully up - even though the container is
- reportedly up and running. Therefore, it tries a few more
times
- */
- LOG.trace("Failed to put the records: {}. Retrying in 2
seconds ...", e.getMessage());
- if (retries == 0) {
- LOG.error("Failed to put the records: {}", e.getMessage(),
e);
- throw e;
- }
-
-
- try {
- Thread.sleep(TimeUnit.SECONDS.toMillis(2));
- } catch (InterruptedException ex) {
- break;
- }
- }
- } while (retries > 0);
-
-
- }
public void runtTest(ConnectorPropertyFactory connectorPropertyFactory)
throws ExecutionException, InterruptedException {
connectorPropertyFactory.log();
getKafkaConnectService().initializeConnector(connectorPropertyFactory);
- putRecords();
+ putRecords(kinesisClient, streamName, expect);
LOG.debug("Initialized the connector and put the data for the test
execution");
LOG.debug("Creating the consumer ...");