This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new cce09f4  Added AWS v2 Kinesis sink test case
cce09f4 is described below

commit cce09f4d6265b1b2a2a369ba6098a3ddb4dd950e
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Jan 26 12:26:02 2021 +0100

    Added AWS v2 Kinesis sink test case
---
 .../KinesisUtils.java}                             | 167 ++++++---------------
 .../TestKinesisConfiguration.java                  |   2 +-
 .../sink/CamelAWSKinesisPropertyFactory.java       |  77 ++++++++++
 .../v2/kinesis/sink/CamelSinkAWSKinesisITCase.java | 148 ++++++++++++++++++
 .../source/CamelSourceAWSKinesisITCase.java        | 164 +-------------------
 5 files changed, 276 insertions(+), 282 deletions(-)

diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
similarity index 55%
copy from 
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
copy to 
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
index e294f3f..2e5a7d1 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/KinesisUtils.java
@@ -15,29 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.common;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
-import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
-import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.apache.camel.test.infra.aws.common.AWSCommon;
-import org.apache.camel.test.infra.aws.common.services.AWSService;
-import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
-import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInstance;
-import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
-import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -49,36 +32,27 @@ import 
software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
 import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.KinesisException;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
 import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.kinesis.model.Shard;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.fail;
 
-@TestInstance(TestInstance.Lifecycle.PER_CLASS)
-@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
+public final class KinesisUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisUtils.class);
 
-    @RegisterExtension
-    public static AWSService awsService = 
AWSServiceFactory.createKinesisService();
-    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSourceAWSKinesisITCase.class);
+    private KinesisUtils() {
 
-    private String streamName;
-    private KinesisClient kinesisClient;
-
-    private volatile int received;
-    private final int expect = 10;
-
-    @Override
-    protected String[] getConnectorsInTest() {
-        return new String[] {"camel-aws2-kinesis-kafka-connector"};
     }
 
-    private void doCreateStream() {
+    private static void doCreateStream(KinesisClient kinesisClient, String 
streamName) {
         CreateStreamRequest request = CreateStreamRequest.builder()
                 .streamName(streamName)
                 .shardCount(1)
@@ -98,7 +72,7 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private void createStream() {
+    public static void createStream(KinesisClient kinesisClient, String 
streamName) {
         try {
             LOG.info("Checking whether the stream exists already");
             DescribeStreamRequest request = DescribeStreamRequest.builder()
@@ -110,12 +84,17 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
             int status = response.sdkHttpResponse().statusCode();
             LOG.info("Kinesis stream check result: {}", status);
         } catch (KinesisException e) {
-            LOG.info("The stream does not exist, auto creating it: {}", 
e.getMessage(), e);
-            doCreateStream();
+            if (LOG.isTraceEnabled()) {
+                LOG.info("The stream does not exist, auto creating it: {}", 
e.getMessage(), e);
+            } else {
+                LOG.info("The stream does not exist, auto creating it: {}", 
e.getMessage());
+            }
+
+            doCreateStream(kinesisClient, streamName);
         }
     }
 
-    private void doDeleteStream() {
+    public static void doDeleteStream(KinesisClient kinesisClient, String 
streamName) {
         DeleteStreamRequest request = DeleteStreamRequest.builder()
                 .streamName(streamName)
                 .build();
@@ -129,7 +108,7 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private void deleteStream() {
+    public static void deleteStream(KinesisClient kinesisClient, String 
streamName) {
         try {
             LOG.info("Checking whether the stream exists already");
 
@@ -142,49 +121,21 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
 
             if (response.sdkHttpResponse().isSuccessful()) {
                 LOG.info("Kinesis stream check result");
-                doDeleteStream();
+                doDeleteStream(kinesisClient, streamName);
             }
         } catch (ResourceNotFoundException e) {
             LOG.info("The stream does not exist, skipping deletion");
         } catch (ResourceInUseException e) {
             LOG.info("The stream exist but cannot be deleted because it's in 
use");
-            doDeleteStream();
+            doDeleteStream(kinesisClient, streamName);
         }
     }
 
-
-    @BeforeEach
-    public void setUp() {
-        streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + 
TestUtils.randomWithRange(0, 100);
-
-        kinesisClient = AWSSDKClientUtils.newKinesisClient();
-        received = 0;
-
-        createStream();
-    }
-
-
-    @AfterEach
-    public void tearDown() {
-        deleteStream();
-    }
-
-    private boolean checkRecord(ConsumerRecord<String, String> record) {
-        LOG.debug("Received: {}", record.value());
-        received++;
-
-        if (received == expect) {
-            return false;
-        }
-
-        return true;
-    }
-
-    private void putRecords() {
+    public static void putRecords(KinesisClient kinesisClient, String 
streamName, int count) {
         List<PutRecordsRequestEntry> putRecordsRequestEntryList = new 
ArrayList<>();
 
         LOG.debug("Adding data to the Kinesis stream");
-        for (int i = 0; i < expect; i++) {
+        for (int i = 0; i < count; i++) {
             String partition = String.format("partitionKey-%d", i);
 
             PutRecordsRequestEntry putRecordsRequestEntry = 
PutRecordsRequestEntry.builder()
@@ -231,7 +182,6 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
                     throw e;
                 }
 
-
                 try {
                     Thread.sleep(TimeUnit.SECONDS.toMillis(2));
                 } catch (InterruptedException ex) {
@@ -239,63 +189,32 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
                 }
             }
         } while (retries > 0);
-
-
     }
 
-    public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
-        connectorPropertyFactory.log();
-        getKafkaConnectService().initializeConnector(connectorPropertyFactory);
-
-        putRecords();
-        LOG.debug("Initialized the connector and put the data for the test 
execution");
+    public static GetRecordsRequest getGetRecordsRequest(KinesisClient 
kinesisClient, String streamName) {
+        DescribeStreamRequest describeStreamRequest = 
DescribeStreamRequest.builder()
+                .streamName(streamName)
+                .build();
+        List<Shard> shards = new ArrayList<>();
 
-        LOG.debug("Creating the consumer ...");
-        KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-        kafkaClient.consume(TestUtils.getDefaultTestTopic(this.getClass()), 
this::checkRecord);
-        LOG.debug("Created the consumer ...");
+        DescribeStreamResponse streamRes;
+        do {
+            streamRes = kinesisClient.describeStream(describeStreamRequest);
+            shards.addAll(streamRes.streamDescription().shards());
+        } while (streamRes.streamDescription().hasMoreShards());
 
-        assertEquals(received, expect, "Didn't process the expected amount of 
messages");
-    }
 
-    @Test
-    @Timeout(120)
-    public void testBasicSendReceive() throws ExecutionException, 
InterruptedException {
-        ConnectorPropertyFactory connectorPropertyFactory = 
CamelAWSKinesisPropertyFactory
-                .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withAmazonConfig(awsService.getConnectionProperties())
-                .withConfiguration(TestKinesisConfiguration.class.getName())
-                .withStreamName(streamName);
-
-        runtTest(connectorPropertyFactory);
-    }
+        GetShardIteratorRequest iteratorRequest = 
GetShardIteratorRequest.builder()
+                .streamName(streamName)
+                .shardId(shards.get(0).shardId())
+                .shardIteratorType("TRIM_HORIZON")
+                .build();
 
-    @Test
-    @Timeout(120)
-    public void testBasicSendReceiveWithKafkaStyle() throws 
ExecutionException, InterruptedException {
-        ConnectorPropertyFactory connectorPropertyFactory = 
CamelAWSKinesisPropertyFactory
-                .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withAmazonConfig(awsService.getConnectionProperties(), 
CamelAWSKinesisPropertyFactory.KAFKA_STYLE)
-                .withConfiguration(TestKinesisConfiguration.class.getName())
-                .withStreamName(streamName);
-
-        runtTest(connectorPropertyFactory);
-    }
+        GetShardIteratorResponse iteratorResponse = 
kinesisClient.getShardIterator(iteratorRequest);
 
-    @Test
-    @Timeout(120)
-    public void testBasicSendReceiveUsingUrl() throws ExecutionException, 
InterruptedException {
-        ConnectorPropertyFactory connectorPropertyFactory = 
CamelAWSKinesisPropertyFactory
-                .basic()
-                .withKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()))
-                .withAmazonConfig(awsService.getConnectionProperties())
-                .withConfiguration(TestKinesisConfiguration.class.getName())
-                .withUrl(streamName)
-                .buildUrl();
-
-        runtTest(connectorPropertyFactory);
+        return GetRecordsRequest
+                .builder()
+                .shardIterator(iteratorResponse.shardIterator())
+                .build();
     }
-
 }
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/TestKinesisConfiguration.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/TestKinesisConfiguration.java
similarity index 95%
rename from 
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/TestKinesisConfiguration.java
rename to 
tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/TestKinesisConfiguration.java
index 20ef00a..ddacd30 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/TestKinesisConfiguration.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/common/TestKinesisConfiguration.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.common;
 
 import org.apache.camel.component.aws2.kinesis.Kinesis2Configuration;
 import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelAWSKinesisPropertyFactory.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelAWSKinesisPropertyFactory.java
new file mode 100644
index 0000000..fb250a1
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelAWSKinesisPropertyFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.sink;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils;
+import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory;
+import org.apache.camel.test.infra.aws.common.AWSConfigs;
+
+
+/**
+ * Creates the set of properties used by a Camel Kinesis Source Connector
+ */
+final class CamelAWSKinesisPropertyFactory extends 
SinkConnectorPropertyFactory<CamelAWSKinesisPropertyFactory> {
+    public static final Map<String, String> SPRING_STYLE = new HashMap<>();
+    public static final Map<String, String> KAFKA_STYLE = new HashMap<>();
+
+    static {
+        SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, 
"camel.component.aws2-kinesis.accessKey");
+        SPRING_STYLE.put(AWSConfigs.SECRET_KEY, 
"camel.component.aws2-kinesis.secretKey");
+        SPRING_STYLE.put(AWSConfigs.REGION, 
"camel.component.aws2-kinesis.region");
+
+        KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, 
"camel.component.aws2-kinesis.access-key");
+        KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, 
"camel.component.aws2-kinesis.secret-key");
+        KAFKA_STYLE.put(AWSConfigs.REGION, 
"camel.component.aws2-kinesis.region");
+    }
+
+    private CamelAWSKinesisPropertyFactory() {
+
+    }
+
+    public CamelAWSKinesisPropertyFactory withAmazonConfig(Properties 
amazonConfigs) {
+        return withAmazonConfig(amazonConfigs, this.SPRING_STYLE);
+    }
+
+    public CamelAWSKinesisPropertyFactory withAmazonConfig(Properties 
amazonConfigs, Map<String, String> style) {
+        AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this);
+
+        return this;
+    }
+
+    public CamelAWSKinesisPropertyFactory withStreamName(String streamName) {
+        return setProperty("camel.sink.path.streamName", streamName);
+    }
+
+    public CamelAWSKinesisPropertyFactory withConfiguration(String 
configurationClass) {
+        return setProperty("camel.component.aws2-kinesis.configuration",
+                classRef(configurationClass));
+    }
+
+    public static CamelAWSKinesisPropertyFactory basic() {
+        return new CamelAWSKinesisPropertyFactory()
+                .withName("CamelAwsKinesisSinkConnector")
+                .withTasksMax(1)
+                
.withConnectorClass("org.apache.camel.kafkaconnector.aws2kinesis.CamelAws2kinesisSinkConnector")
+                
.withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter")
+                
.withValueConverterClass("org.apache.kafka.connect.storage.StringConverter");
+    }
+}
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
new file mode 100644
index 0000000..8d9e6c6
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/sink/CamelSinkAWSKinesisITCase.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.kinesis.sink;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.kafkaconnector.CamelSinkTask;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
+import org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils;
+import 
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.apache.camel.test.infra.aws.common.AWSCommon;
+import org.apache.camel.test.infra.aws.common.services.AWSService;
+import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils;
+import org.apache.camel.test.infra.aws2.services.AWSServiceFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import static 
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
+public class CamelSinkAWSKinesisITCase  extends CamelSinkAWSTestSupport {
+    @RegisterExtension
+    public static AWSService awsService = 
AWSServiceFactory.createKinesisService();
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkAWSKinesisITCase.class);
+
+    private String streamName;
+    private KinesisClient kinesisClient;
+
+    private volatile int received;
+    private final int expect = 10;
+
+    @Override
+    protected Map<String, String> messageHeaders(String text, int current) {
+        Map<String, String> headers = new HashMap<>();
+
+        headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + 
"CamelAwsKinesisPartitionKey",
+                "partition-" + current);
+
+        return headers;
+    }
+
+    @Override
+    protected void consumeMessages(CountDownLatch latch) {
+        try {
+            GetRecordsRequest getRecordsRequest = 
KinesisUtils.getGetRecordsRequest(kinesisClient, streamName);
+
+            while (true) {
+                GetRecordsResponse response = 
kinesisClient.getRecords(getRecordsRequest);
+
+                List<Record> recordList = response.records();
+                received = recordList.size();
+                for (Record record : recordList) {
+                    LOG.info("Received record: {}", record.data());
+
+                    if (received >= expect) {
+                        return;
+                    }
+                }
+
+                if (!waitForData()) {
+                    return;
+                }
+            }
+        } catch  (Exception e) {
+            LOG.error("Error consuming records: {}", e.getMessage(), e);
+        } finally {
+            latch.countDown();
+        }
+    }
+
+
+
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the 
specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
+    @Override
+    protected String[] getConnectorsInTest() {
+        return new String[] {"camel-aws2-kinesis-kafka-connector"};
+    }
+
+    @BeforeEach
+    public void setUp() {
+        streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + 
TestUtils.randomWithRange(0, 100);
+
+        kinesisClient = AWSSDKClientUtils.newKinesisClient();
+        received = 0;
+
+        createStream(kinesisClient, streamName);
+    }
+
+    @Test
+    @Timeout(120)
+    public void testBasicSendReceive() throws Exception {
+        Properties amazonProperties = awsService.getConnectionProperties();
+        String topicName = TestUtils.getDefaultTestTopic(this.getClass());
+
+        ConnectorPropertyFactory connectorPropertyFactory = 
CamelAWSKinesisPropertyFactory
+                .basic()
+                .withTopics(topicName)
+                .withAmazonConfig(amazonProperties)
+                .withConfiguration(TestKinesisConfiguration.class.getName())
+                .withStreamName(streamName);
+
+        runTest(connectorPropertyFactory, topicName, expect);
+    }
+}
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
index e294f3f..e19d8bd 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -17,11 +17,9 @@
 
 package org.apache.camel.kafkaconnector.aws.v2.kinesis.source;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
+import 
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.TestKinesisConfiguration;
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
@@ -40,24 +38,12 @@ import 
org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.awscore.exception.AwsServiceException;
-import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.DeleteStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DeleteStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.KinesisException;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
-import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
-import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 
+import static 
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.createStream;
+import static 
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.deleteStream;
+import static 
org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.putRecords;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
@@ -78,81 +64,6 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
         return new String[] {"camel-aws2-kinesis-kafka-connector"};
     }
 
-    private void doCreateStream() {
-        CreateStreamRequest request = CreateStreamRequest.builder()
-                .streamName(streamName)
-                .shardCount(1)
-                .build();
-
-        try {
-            CreateStreamResponse response = 
kinesisClient.createStream(request);
-
-            if (response.sdkHttpResponse().isSuccessful()) {
-                LOG.info("Stream created successfully");
-            } else {
-                fail("Failed to create the stream");
-            }
-        } catch (KinesisException e) {
-            LOG.error("Unable to create stream: {}", e.getMessage(), e);
-            fail("Unable to create stream");
-        }
-    }
-
-    private void createStream() {
-        try {
-            LOG.info("Checking whether the stream exists already");
-            DescribeStreamRequest request = DescribeStreamRequest.builder()
-                    .streamName(streamName)
-                    .build();
-
-            DescribeStreamResponse response = 
kinesisClient.describeStream(request);
-
-            int status = response.sdkHttpResponse().statusCode();
-            LOG.info("Kinesis stream check result: {}", status);
-        } catch (KinesisException e) {
-            LOG.info("The stream does not exist, auto creating it: {}", 
e.getMessage(), e);
-            doCreateStream();
-        }
-    }
-
-    private void doDeleteStream() {
-        DeleteStreamRequest request = DeleteStreamRequest.builder()
-                .streamName(streamName)
-                .build();
-
-        DeleteStreamResponse response = kinesisClient.deleteStream(request);
-
-        if (response.sdkHttpResponse().isSuccessful()) {
-            LOG.info("Stream deleted successfully");
-        } else {
-            fail("Failed to delete the stream");
-        }
-    }
-
-    private void deleteStream() {
-        try {
-            LOG.info("Checking whether the stream exists already");
-
-
-            DescribeStreamRequest request = DescribeStreamRequest.builder()
-                    .streamName(streamName)
-                    .build();
-
-            DescribeStreamResponse response = 
kinesisClient.describeStream(request);
-
-            if (response.sdkHttpResponse().isSuccessful()) {
-                LOG.info("Kinesis stream check result");
-                doDeleteStream();
-            }
-        } catch (ResourceNotFoundException e) {
-            LOG.info("The stream does not exist, skipping deletion");
-        } catch (ResourceInUseException e) {
-            LOG.info("The stream exist but cannot be deleted because it's in 
use");
-            doDeleteStream();
-        }
-    }
-
-
     @BeforeEach
     public void setUp() {
         streamName = AWSCommon.KINESIS_STREAM_BASE_NAME + "-" + 
TestUtils.randomWithRange(0, 100);
@@ -160,13 +71,13 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
         kinesisClient = AWSSDKClientUtils.newKinesisClient();
         received = 0;
 
-        createStream();
+        createStream(kinesisClient, streamName);
     }
 
 
     @AfterEach
     public void tearDown() {
-        deleteStream();
+        deleteStream(kinesisClient, streamName);
     }
 
     private boolean checkRecord(ConsumerRecord<String, String> record) {
@@ -180,74 +91,13 @@ public class CamelSourceAWSKinesisITCase extends 
AbstractKafkaTest {
         return true;
     }
 
-    private void putRecords() {
-        List<PutRecordsRequestEntry> putRecordsRequestEntryList = new 
ArrayList<>();
-
-        LOG.debug("Adding data to the Kinesis stream");
-        for (int i = 0; i < expect; i++) {
-            String partition = String.format("partitionKey-%d", i);
-
-            PutRecordsRequestEntry putRecordsRequestEntry = 
PutRecordsRequestEntry.builder()
-                    .data(SdkBytes.fromByteArray(String.valueOf(i).getBytes()))
-                    .partitionKey(partition)
-                    .build();
-
-            LOG.debug("Added data {} (as bytes) to partition {}", i, 
partition);
-            putRecordsRequestEntryList.add(putRecordsRequestEntry);
-        }
-
-        LOG.debug("Done creating the data records");
-
-        PutRecordsRequest putRecordsRequest = PutRecordsRequest
-                .builder()
-                .streamName(streamName)
-                .records(putRecordsRequestEntryList)
-                .build();
 
-        int retries = 5;
-        do {
-            try {
-                PutRecordsResponse response = 
kinesisClient.putRecords(putRecordsRequest);
-
-                if (response.sdkHttpResponse().isSuccessful()) {
-                    LOG.debug("Done putting the data records into the stream");
-                } else {
-                    fail("Unable to put all the records into the stream");
-                }
-
-                break;
-            } catch (AwsServiceException e) {
-                retries--;
-
-                /*
-                 This works around the "... Cannot deserialize instance of 
`...AmazonKinesisException` out of NOT_AVAILABLE token
-
-                 It may take some time for the local Kinesis backend to be 
fully up - even though the container is
-                 reportedly up and running. Therefore, it tries a few more 
times
-                 */
-                LOG.trace("Failed to put the records: {}. Retrying in 2 
seconds ...", e.getMessage());
-                if (retries == 0) {
-                    LOG.error("Failed to put the records: {}", e.getMessage(), 
e);
-                    throw e;
-                }
-
-
-                try {
-                    Thread.sleep(TimeUnit.SECONDS.toMillis(2));
-                } catch (InterruptedException ex) {
-                    break;
-                }
-            }
-        } while (retries > 0);
-
-
-    }
 
     public void runtTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
 
-        putRecords();
+        putRecords(kinesisClient, streamName, expect);
         LOG.debug("Initialized the connector and put the data for the test 
execution");
 
         LOG.debug("Creating the consumer ...");

Reply via email to